You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2022/03/15 22:15:14 UTC
[phoenix] branch 5.1 updated: PHOENIX-6501 Use batching when joining data table rows with uncovered global index rows (#1399)
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 859f46b PHOENIX-6501 Use batching when joining data table rows with uncovered global index rows (#1399)
859f46b is described below
commit 859f46b8e9cd81cd643ca4413196852267de6be5
Author: kadirozde <37...@users.noreply.github.com>
AuthorDate: Tue Mar 15 15:12:08 2022 -0700
PHOENIX-6501 Use batching when joining data table rows with uncovered global index rows (#1399)
---
.../end2end/index/GlobalIndexCheckerIT.java | 80 ----
.../index/UncoveredGlobalIndexRegionScannerIT.java | 396 ++++++++++++++++++++
.../coprocessor/BaseScannerRegionObserver.java | 4 +-
.../UncoveredGlobalIndexRegionScanner.java | 405 +++++++++++++++++++++
.../phoenix/iterate/RegionScannerFactory.java | 60 +--
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 3 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 33 +-
8 files changed, 861 insertions(+), 122 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 8eb0aac..95f0860 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -477,86 +477,6 @@ public class GlobalIndexCheckerIT extends BaseTest {
}
@Test
- public void testUncoveredGlobalIndex() throws Exception {
- if (async) {
- return;
- }
- String dataTableName = generateUniqueName();
- populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- String indexTableName = generateUniqueName();
- conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
- dataTableName + " (val1) include (val2)" + this.indexDDLOptions);
- // Verify that without hint, the index table is not selected
- assertIndexTableNotSelected(conn, dataTableName, indexTableName,
- "SELECT val3 from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')");
-
- //Verify that with index hint, we will read from the index table even though val3 is not included by the index table
- String selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3 from "
- + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')";
- assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
- ResultSet rs = conn.createStatement().executeQuery(selectSql);
- assertTrue(rs.next());
- assertEquals("bcde", rs.getString(1));
- assertFalse(rs.next());
- conn.createStatement().execute("DROP INDEX " + indexTableName + " on " + dataTableName);
- // Create an index does not include any columns
- indexTableName = generateUniqueName();
- conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
- dataTableName + " (val1)" + this.indexDDLOptions);
- conn.commit();
-
- // Verify that without hint, the index table is not selected
- assertIndexTableNotSelected(conn, dataTableName, indexTableName,
- "SELECT id from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')");
- selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ id from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')";
- //Verify that we will read from the index table
- assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
- rs = conn.createStatement().executeQuery(selectSql);
- assertTrue(rs.next());
- assertEquals("b", rs.getString(1));
- assertFalse(rs.next());
-
- // Add another row and run a group by query where the uncovered index should be used
- conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')");
- conn.commit();
- // Verify that without hint, the index table is not selected
- assertIndexTableNotSelected(conn, dataTableName, indexTableName,
- "SELECT count(*) from " + dataTableName + " where val1 > '0' GROUP BY val1");
- selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(*) from " + dataTableName + " where val1 > '0' GROUP BY val1";
- //Verify that we will read from the index table
- assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
- rs = conn.createStatement().executeQuery(selectSql);
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertFalse(rs.next());
- selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0'";
- //Verify that we will read from the index table
- assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
- rs = conn.createStatement().executeQuery(selectSql);
- assertTrue(rs.next());
- assertEquals(3, rs.getInt(1));
- // Run an order by query where the uncovered index should be used
- // Verify that without hint, the index table is not selected
- assertIndexTableNotSelected(conn, dataTableName, indexTableName,
- "SELECT val3 from " + dataTableName + " where val1 > '0' ORDER BY val1");
- selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3 from " + dataTableName + " where val1 > '0' ORDER BY val1";
- //Verify that we will read from the index table
- assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
- rs = conn.createStatement().executeQuery(selectSql);
- assertTrue(rs.next());
- assertEquals("abcd", rs.getString(1));
- assertTrue(rs.next());
- assertEquals("cdef", rs.getString(1));
- assertTrue(rs.next());
- assertEquals("bcde", rs.getString(1));
- assertFalse(rs.next());
- }
- }
-
- @Test
public void testSimulateConcurrentUpdates() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
new file mode 100644
index 0000000..93d4d80
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @After
+ public void unsetFailForTesting() throws Exception {
+ boolean refCountLeaked = isAnyStoreRefCountLeaked();
+ assertFalse("refCount leaked", refCountLeaked);
+ }
+
+ private void populateTable(String tableName) throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("create table " + tableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))");
+ conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+ conn.commit();
+ conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
+ conn.commit();
+ conn.close();
+ }
+
+ @Test
+ public void testUncoveredIndexWithPhoenixRowTimestamp() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexTableName = generateUniqueName();
+ Timestamp initial = new Timestamp(EnvironmentEdgeManager.currentTimeMillis() - 1);
+ conn.createStatement().execute("create table " + dataTableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))");
+ conn.createStatement().execute("upsert into " + dataTableName + " values ('a', 'ab', 'abc', 'abcd')");
+ conn.commit();
+ Timestamp before = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+ // Sleep 1ms to get a different row timestamps
+ Thread.sleep(1);
+ conn.createStatement().execute("upsert into " + dataTableName + " values ('b', 'bc', 'bcd', 'bcde')");
+ conn.commit();
+ Timestamp after = new Timestamp(EnvironmentEdgeManager.currentTimeMillis() + 1);
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1, PHOENIX_ROW_TIMESTAMP()) ");
+
+ String timeZoneID = Calendar.getInstance().getTimeZone().getID();
+ // Write a query to get the val2 = 'bc' with a time range query
+ String query = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ "
+ + "val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName
+ + " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('"
+ + before.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '"
+ + timeZoneID + "') AND " + "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after
+ + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, query, dataTableName, indexTableName);
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("bc", rs.getString(1));
+ assertEquals("bcd", rs.getString(2));
+ assertTrue(rs.getTimestamp(3).after(before));
+ assertTrue(rs.getTimestamp(3).before(after));
+ assertFalse(rs.next());
+ // Count the number of index rows
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + indexTableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ // Add one more row with val2 ='bc' and check this does not change the result of the previous
+ // query
+ // Sleep 1ms to get a different row timestamps
+ Thread.sleep(1);
+ conn.createStatement().execute("upsert into " + dataTableName + " values ('c', 'bc', 'ccc', 'cccc')");
+ conn.commit();
+ assertExplainPlan(conn, query, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("bc", rs.getString(1));
+ assertEquals("bcd", rs.getString(2));
+ assertTrue(rs.getTimestamp(3).after(before));
+ assertTrue(rs.getTimestamp(3).before(after));
+ assertFalse(rs.next());
+ // Write a time range query to get the last row with val2 ='bc'
+ query = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ "
+ +"val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName +
+ " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after
+ + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, query, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("bc", rs.getString(1));
+ assertEquals("ccc", rs.getString(2));
+ assertTrue(rs.getTimestamp(3).after(after));
+ assertFalse(rs.next());
+ // Verify that we can execute the same query without using the index
+ String noIndexQuery = "SELECT /*+ NO_INDEX */ val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " +
+ "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+ // Verify that we will read from the data table
+ rs = conn.createStatement().executeQuery("EXPLAIN " + noIndexQuery);
+ String explainPlan = QueryUtil.getExplainPlan(rs);
+ assertTrue(explainPlan.contains("FULL SCAN OVER " + dataTableName));
+ rs = conn.createStatement().executeQuery(noIndexQuery);
+ assertTrue(rs.next());
+ assertEquals("bc", rs.getString(1));
+ assertEquals("ccc", rs.getString(2));
+ assertTrue(rs.getTimestamp(3).after(after));
+ after = rs.getTimestamp(3);
+ assertFalse(rs.next());
+ // Sleep 1ms to get a different row timestamps
+ Thread.sleep(1);
+ conn.createStatement().execute("upsert into " + dataTableName + " values ('d', 'de', 'def', 'defg')");
+ conn.commit();
+
+ query = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ "
+ + " val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName
+ + " WHERE val1 = 'de'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, query, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("de", rs.getString(1));
+ assertEquals("def", rs.getString(2));
+ assertTrue(rs.getTimestamp(3).after(after));
+ assertFalse(rs.next());
+ // Add a new index where the index row key starts with PHOENIX_ROW_TIMESTAMP()
+ indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (PHOENIX_ROW_TIMESTAMP()) ");
+ // Add one more row
+ // Sleep 1ms to get a different row timestamps
+ Thread.sleep(1);
+ conn.createStatement().execute("upsert into " + dataTableName + " values ('e', 'ae', 'efg', 'efgh')");
+ conn.commit();
+ // Write a query to get all the rows in the order of their timestamps
+ query = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ "
+ + " val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE "
+ + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + initial
+ + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, query, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("ab", rs.getString(1));
+ assertEquals("abc", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("bc", rs.getString(1));
+ assertEquals("bcd", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("bc", rs.getString(1));
+ assertEquals("ccc", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("de", rs.getString(1));
+ assertEquals("def", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("ae", rs.getString(1));
+ assertEquals("efg", rs.getString(2));
+ assertFalse(rs.next());
+ }
+ }
+
+ private void assertIndexTableNotSelected(Connection conn, String dataTableName, String indexTableName, String sql)
+ throws Exception {
+ try {
+ assertExplainPlan(conn, sql, dataTableName, indexTableName);
+ throw new RuntimeException("The index table should not be selected without an index hint");
+ } catch (AssertionError error){
+ //expected
+ }
+ }
+
+ @Test
+ public void testUncoveredGlobalIndex() throws Exception {
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2)");
+ // Verify that without hint, the index table is not selected
+ assertIndexTableNotSelected(conn, dataTableName, indexTableName,
+ "SELECT val3 from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')");
+
+ //Verify that with index hint, we will read from the index table even though val3 is not included by the index table
+ String selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3 from "
+ + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')";
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("bcde", rs.getString(1));
+ assertFalse(rs.next());
+ conn.createStatement().execute("DROP INDEX " + indexTableName + " on " + dataTableName);
+ // Create an index does not include any columns
+ indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1)");
+ conn.commit();
+
+ // Verify that without hint, the index table is not selected
+ assertIndexTableNotSelected(conn, dataTableName, indexTableName,
+ "SELECT id from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')");
+ selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ id from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')";
+ //Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertFalse(rs.next());
+
+ // Add another row and run a group by query where the uncovered index should be used
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')");
+ conn.commit();
+ // Verify that without hint, the index table is not selected
+ assertIndexTableNotSelected(conn, dataTableName, indexTableName,
+ "SELECT count(val3) from " + dataTableName + " where val1 > '0' GROUP BY val1");
+ selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0' GROUP BY val1";
+ //Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+ selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0'";
+ //Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ // Run an order by query where the uncovered index should be used
+ // Verify that without hint, the index table is not selected
+ assertIndexTableNotSelected(conn, dataTableName, indexTableName,
+ "SELECT val3 from " + dataTableName + " where val1 > '0' ORDER BY val1");
+ selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3 from " + dataTableName + " where val1 > '0' ORDER BY val1";
+ //Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("abcd", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("cdef", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("bcde", rs.getString(1));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testSkipScanFilter() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + dataTableName
+ + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, "
+ + "v2 INTEGER, v3 INTEGER "
+ + "CONSTRAINT pk PRIMARY KEY (k1,k2)) "
+ + " COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+ TestUtil.addCoprocessor(conn, dataTableName, ScanFilterRegionObserver.class);
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON "
+ + dataTableName + "(v1) include (v2)");
+ final int nIndexValues = 97;
+ final Random RAND = new Random(7);
+ final int batchSize = 100;
+ for (int i = 0; i < 10000; i++) {
+ conn.createStatement().execute(
+ "UPSERT INTO " + dataTableName + " VALUES (" + i + ", 1, "
+ + (RAND.nextInt() % nIndexValues) + ", "
+ + RAND.nextInt() + ", 1)");
+ if ((i % batchSize) == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ String selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName
+ + ")*/ SUM(v3) from " + dataTableName + " GROUP BY v1";
+
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ int sum = 0;
+ while (rs.next()) {
+ sum += rs.getInt(1);
+ }
+ assertEquals(10000, sum);
+ // UncoveredGlobalIndexRegionScanner uses the skip scan filter to retrieve data table
+ // rows. Verify that the skip scan filter is used
+ assertEquals(10000, ScanFilterRegionObserver.count.get());
+ }
+ }
+
+ @Test
+ public void testCount() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + dataTableName
+ + "(k1 BIGINT NOT NULL, k2 BIGINT NOT NULL, v1 INTEGER, "
+ + "v2 INTEGER, v3 BIGINT "
+ + "CONSTRAINT pk PRIMARY KEY (k1,k2)) "
+ + " VERSIONS=1, IMMUTABLE_ROWS=TRUE");
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON "
+ + dataTableName + "(v1) include (v2)");
+ final int nIndexValues = 9;
+ final Random RAND = new Random(7);
+ final int batchSize = 1000;
+ for (int i = 0; i < 100000; i++) {
+ conn.createStatement().execute(
+ "UPSERT INTO " + dataTableName + " VALUES (" + i + ", 1, "
+ + (RAND.nextInt() % nIndexValues) + ", "
+ + RAND.nextInt() + ", " + RAND.nextInt()+ ")");
+ if ((i % batchSize) == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ String selectSql = "SELECT /*+ NO INDEX */ Count(v3) from " + dataTableName + " where v1 = 5";
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ long count = rs.getLong(1);
+ selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName
+ + ")*/ Count(v3) from " + dataTableName + " where v1 = 5";
+ //Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals(count, rs.getInt(1));
+ }
+ }
+
+ public static class ScanFilterRegionObserver extends SimpleRegionObserver {
+ public static final AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Scan scan) {
+ if (scan.getFilter() instanceof SkipScanFilter) {
+ List<List<KeyRange>> slots = ((SkipScanFilter)scan.getFilter()).getSlots();
+ for (List<KeyRange> ranges : slots) {
+ count.addAndGet(ranges.size());
+ }
+ }
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b2ef71c..2e5fbc1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -73,6 +73,7 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
// The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGE_ROWS = "_IndexRebuildPageRows";
+ public static final String INDEX_PAGE_ROWS = "_IndexPageRows";
public static final String SERVER_PAGE_SIZE_MS = "_ServerPageSizeMs";
// Index verification type done by the index tool
public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
@@ -387,7 +388,8 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
final Region dataRegion, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final TupleProjector projector,
- final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) {
+ final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex)
+ throws IOException {
RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
new file mode 100644
index 0000000..f5206a7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.hbase.index.parallel.*;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an index table region scanner which scans index table rows locally and then extracts
+ * data table row keys from them. Using the data table row keys, the data table rows are scanned
+ * using the HBase client available to region servers.
+ */
+public class UncoveredGlobalIndexRegionScanner extends BaseRegionScanner {
+ public static final String NUM_CONCURRENT_INDEX_THREADS_CONF_KEY = "index.threads.max";
+ public static final int DEFAULT_CONCURRENT_INDEX_THREADS = 16;
+ public static final String INDEX_ROW_COUNTS_PER_TASK_CONF_KEY = "index.row.count.per.task";
+ public static final int DEFAULT_INDEX_ROW_COUNTS_PER_TASK = 2048;
+
+ /**
+ * The states of the processing a page of index rows
+ */
+ private enum State {
+ INITIAL, SCANNING_INDEX, SCANNING_DATA, SCANNING_DATA_INTERRUPTED, READY
+ }
+ State state = State.INITIAL;
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(UncoveredGlobalIndexRegionScanner.class);
+ protected final byte[][] viewConstants;
+ protected final RegionCoprocessorEnvironment env;
+ protected byte[][] regionEndKeys;
+ protected final Table dataHTable;
+ protected final int pageSizeInRows;
+ protected final int rowCountPerTask;
+ protected final Scan scan;
+ protected final Scan dataTableScan;
+ protected final RegionScanner innerScanner;
+ protected final Region region;
+ protected final IndexMaintainer indexMaintainer;
+ protected final TupleProjector tupleProjector;
+ protected final ImmutableBytesWritable ptr;
+ protected final TaskRunner pool;
+ protected String exceptionMessage;
+ protected final HTableFactory hTableFactory;
+ protected List<List<Cell>> indexRows = null;
+ protected Map<ImmutableBytesPtr, Result> dataRows = null;
+ protected Iterator<List<Cell>> indexRowIterator = null;
+ protected Map<byte[], byte[]> indexToDataRowKeyMap = null;
+ protected int indexRowCount = 0;
+ protected final long pageSizeMs;
+ protected byte[] lastIndexRowKey = null;
+
+ public UncoveredGlobalIndexRegionScanner(final RegionScanner innerScanner,
+ final Region region,
+ final Scan scan,
+ final RegionCoprocessorEnvironment env,
+ final Scan dataTableScan,
+ final TupleProjector tupleProjector,
+ final IndexMaintainer indexMaintainer,
+ final byte[][] viewConstants,
+ final ImmutableBytesWritable ptr,
+ final long pageSizeMs)
+ throws IOException {
+ super(innerScanner);
+ final Configuration config = env.getConfiguration();
+
+ byte[] pageSizeFromScan =
+ scan.getAttribute(BaseScannerRegionObserver.INDEX_PAGE_ROWS);
+ if (pageSizeFromScan != null) {
+ pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan);
+ } else {
+ pageSizeInRows = (int)
+ config.getLong(INDEX_PAGE_SIZE_IN_ROWS,
+ QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS);
+ }
+
+ this.indexMaintainer = indexMaintainer;
+ this.viewConstants = viewConstants;
+ this.scan = scan;
+ this.dataTableScan = dataTableScan;
+ this.innerScanner = innerScanner;
+ this.region = region;
+ this.env = env;
+ this.ptr = ptr;
+ this.tupleProjector = tupleProjector;
+ this.pageSizeMs = pageSizeMs;
+ hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env);
+ rowCountPerTask = config.getInt(INDEX_ROW_COUNTS_PER_TASK_CONF_KEY,
+ DEFAULT_INDEX_ROW_COUNTS_PER_TASK);
+
+ pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder("Uncovered Global Index",
+ env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_THREADS).setCoreTimeout(
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+ byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+ dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
+ try (org.apache.hadoop.hbase.client.Connection connection =
+ HBaseFactoryProvider.getHConnectionFactory().createConnection(
+ env.getConfiguration())) {
+ regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+ }
+ }
+
+ @Override
+ public long getMvccReadPoint() {
+ return innerScanner.getMvccReadPoint();
+ }
+ @Override
+ public RegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ innerScanner.close();
+ hTableFactory.shutdown();
+ if (dataHTable != null) {
+ dataHTable.close();
+ }
+ this.pool.stop("UncoveredGlobalIndexRegionScanner is closing");
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ return innerScanner.getMaxResultSize();
+ }
+
+ @Override
+ public int getBatch() {
+ return innerScanner.getBatch();
+ }
+
+ private void scanDataRows(Set<byte[]> dataRowKeys, long startTime) throws IOException {
+ List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
+ for (byte[] dataRowKey: dataRowKeys) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey));
+ }
+ ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+ Scan dataScan = new Scan(dataTableScan);
+ dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+ scanRanges.initializeScan(dataScan);
+ SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+ dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
+ try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
+ for (Result result = resultScanner.next(); (result != null);
+ result = resultScanner.next()) {
+ dataRows.put(new ImmutableBytesPtr(result.getRow()), result);
+ if ((EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+ LOGGER.info("One of the scan tasks in UncoveredGlobalIndexRegionScanner"
+ + " for region " + region.getRegionInfo().getRegionNameAsString()
+ + " could not complete on time (in " + pageSizeMs+ " ms) and"
+ + " will be resubmitted");
+ state = State.SCANNING_DATA_INTERRUPTED;
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ exceptionMessage = "scanDataRows fails for at least one task";
+ ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+ }
+ }
+
+ private void addTasksForScanningDataTableRowsInParallel(TaskBatch<Boolean> tasks,
+ final Set<byte[]> dataRowKeys,
+ final long startTime) {
+ tasks.add(new Task<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try {
+ //in HBase 1.x we could check if the coproc environment was closed or aborted,
+ //but in HBase 2.x the coproc environment can't check region server services
+ if (Thread.currentThread().isInterrupted()) {
+ exceptionMessage = "Pool closed, not retrieving data table rows for "
+ + region.getRegionInfo().getRegionNameAsString();
+ throw new IOException(exceptionMessage);
+ }
+ scanDataRows(dataRowKeys, startTime);
+ } catch (Exception e) {
+ throw e;
+ }
+ return Boolean.TRUE;
+ }
+ });
+ }
+
+
+ protected void submitTasks(TaskBatch<Boolean> tasks) throws IOException {
+ Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
+ try {
+ LOGGER.debug("Waiting on index tasks to complete...");
+ resultsAndFutures = this.pool.submitUninterruptible(tasks);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(
+ "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+ } catch (EarlyExitFailure e) {
+ throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
+ }
+ int index = 0;
+ for (Boolean result : resultsAndFutures.getFirst()) {
+ if (result == null) {
+ Throwable cause = ServerUtil.getExceptionFromFailedFuture(
+ resultsAndFutures.getSecond().get(index));
+ // there was a failure
+ throw new IOException(exceptionMessage == null ? "" : exceptionMessage, cause);
+ }
+ index++;
+ }
+ }
+
+ private void scanDataTableRows(long startTime)
+ throws IOException {
+ if (indexToDataRowKeyMap.size() == 0) {
+ state = State.READY;
+ return;
+ }
+ TreeSet<byte[]> dataRowKeys = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ for (byte[] dataRowKey: indexToDataRowKeyMap.values()) {
+ dataRowKeys.add(dataRowKey);
+ }
+ List<Set<byte[]>> setList = IndexRepairRegionScanner.getPerTaskDataRowKeys(dataRowKeys,
+ regionEndKeys, rowCountPerTask);
+ int taskCount = setList.size();
+ TaskBatch<Boolean> tasks = new TaskBatch<>(taskCount);
+ for (int i = 0; i < taskCount; i++) {
+ addTasksForScanningDataTableRowsInParallel(tasks, setList.get(i), startTime);
+ }
+ submitTasks(tasks);
+ if (state == State.SCANNING_DATA_INTERRUPTED) {
+ state = State.SCANNING_DATA;
+ } else {
+ state = State.READY;
+ }
+ }
+
+ /**
+ * A page of index rows are scanned and then their corresponding data table rows are retrieved
+ * from the data table regions in parallel. These data rows are then joined with index rows.
+ * The join is for adding uncovered columns to index rows.
+ *
+ * This implementation conforms to server paging such that if the server side operation takes
+ * more than pageSizeInMs, a dummy result is returned to signal the client that more work
+ * to do on the server side. This is done to prevent RPC timeouts.
+ *
+ * @param result
+ * @return boolean to indicate if there are more rows to scan
+ * @throws IOException
+ */
+ @Override
+ public boolean next(List<Cell> result) throws IOException {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ boolean hasMore;
+ region.startRegionOperation();
+ try {
+ synchronized (innerScanner) {
+ if (state == State.READY && !indexRowIterator.hasNext()) {
+ state = State.INITIAL;
+ }
+ if (state == State.INITIAL) {
+ indexRowCount = 0;
+ indexRows = new ArrayList();
+ dataRows = Maps.newConcurrentMap();
+ indexToDataRowKeyMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ state = State.SCANNING_INDEX;
+ }
+ if (state == State.SCANNING_INDEX) {
+ do {
+ List<Cell> row = new ArrayList<Cell>();
+ hasMore = innerScanner.nextRaw(row);
+ if (!row.isEmpty()) {
+ if (isDummy(row)) {
+ result.addAll(row);
+ // We got a dummy request from lower layers. This means that
+ // the scan took more than pageSizeMs. Just return true here.
+ // The client will drop this dummy request and continue to scan.
+ // Then the lower layer scanner will continue
+ // wherever it stopped due to this dummy request
+ return true;
+ }
+ lastIndexRowKey = CellUtil.cloneRow(row.get(0));
+ indexToDataRowKeyMap.put(lastIndexRowKey, indexMaintainer.buildDataRowKey(
+ new ImmutableBytesWritable(lastIndexRowKey), viewConstants));
+ indexRows.add(row);
+ indexRowCount++;
+ if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime)
+ >= pageSizeMs) {
+ getDummyResult(lastIndexRowKey, result);
+ // We do not need to change the state, State.SCANNING_INDEX
+ // since we will continue scanning the index table after
+ // the client drops the dummy request and then calls the next
+ // method on its ResultScanner within ScanningResultIterator
+ return true;
+ }
+ }
+ } while (hasMore && indexRowCount < pageSizeInRows);
+ state = State.SCANNING_DATA;
+ }
+ if (state == State.SCANNING_DATA) {
+ scanDataTableRows(startTime);
+ indexRowIterator = indexRows.iterator();
+ }
+ if (state == State.READY) {
+ if (indexRowIterator.hasNext()) {
+ List<Cell> indexRow = indexRowIterator.next();
+ result.addAll(indexRow);
+ try {
+ Result dataRow = dataRows.get(new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(CellUtil.cloneRow(indexRow.get(0)))));
+ if (dataRow != null) {
+ IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow),
+ tupleProjector, ptr);
+ } else {
+ // The data row does not exist, we should skip this index row. This can happen
+ // if index row is replicated but the data row has not been
+ result.clear();
+ }
+ } catch (Throwable e) {
+ LOGGER.error("Exception in UncoveredGlobalIndexRegionScanner for region "
+ + region.getRegionInfo().getRegionNameAsString(), e);
+ throw e;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ getDummyResult(lastIndexRowKey, result);
+ return true;
+ }
+ }
+ } catch (Throwable e) {
+ LOGGER.error("Exception in UncoveredGlobalIndexRegionScanner for region "
+ + region.getRegionInfo().getRegionNameAsString(), e);
+ throw e;
+ } finally {
+ region.closeRegionOperation();
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 3724c44..281eaad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -20,11 +20,14 @@ package org.apache.phoenix.iterate;
import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
-import static org.apache.phoenix.util.ScanUtil.getDummyResult;
-import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
-import static org.apache.phoenix.util.ScanUtil.isDummy;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.phoenix.coprocessor.UncoveredGlobalIndexRegionScanner;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.Cell;
@@ -51,10 +54,6 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.KeyValueSchema;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PColumnImpl;
-import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
import org.apache.phoenix.schema.tuple.ResultTuple;
@@ -115,20 +114,20 @@ public abstract class RegionScannerFactory {
* @param viewConstants
*/
public RegionScanner getWrappedScanner(final RegionCoprocessorEnvironment env,
- final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs,
+ final RegionScanner regionScanner, final Set<KeyValueColumnExpression> arrayKVRefs,
final Expression[] arrayFuncRefs, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
final Region dataRegion, final IndexMaintainer indexMaintainer,
PhoenixTransactionContext tx,
final byte[][] viewConstants, final KeyValueSchema kvSchema,
final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
- final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) {
+ final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) throws IOException {
return new RegionScanner() {
-
+ private RegionScanner s = regionScanner;
private RegionInfo regionInfo = env.getRegionInfo();
private byte[] actualStartKey = getActualStartKey();
private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
- final long pageSizeMs = getPageSizeMsForRegionScanner(scan);
+ final long pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
Expression extraWhere = null;
long extraLimit = -1;
@@ -159,6 +158,19 @@ public abstract class RegionScannerFactory {
if (limitBytes != null) {
extraLimit = Bytes.toLong(limitBytes);
}
+ if (ScanUtil.isUncoveredGlobalIndex(scan) && tupleProjector != null) {
+ PTable.ImmutableStorageScheme storageScheme = indexMaintainer.getIndexStorageScheme();
+ Scan dataTableScan = new Scan();
+ for (int i = 0; i < dataColumns.length; i++) {
+ if (storageScheme == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ scan.addFamily(dataColumns[i].getFamily());
+ } else {
+ scan.addColumn(dataColumns[i].getFamily(), dataColumns[i].getQualifier());
+ }
+ }
+ s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+ dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs);
+ }
}
}
@@ -173,7 +185,7 @@ public abstract class RegionScannerFactory {
public boolean next(List<Cell> results) throws IOException {
try {
boolean next = s.next(results);
- if (isDummy(results)) {
+ if (ScanUtil.isDummy(results)) {
return true;
}
return next;
@@ -217,7 +229,7 @@ public abstract class RegionScannerFactory {
public boolean nextRaw(List<Cell> result) throws IOException {
try {
boolean next = s.nextRaw(result);
- if (isDummy(result)) {
+ if (ScanUtil.isDummy(result)) {
return true;
}
if (result.size() == 0) {
@@ -225,19 +237,21 @@ public abstract class RegionScannerFactory {
}
if ((ScanUtil.isLocalOrUncoveredGlobalIndex(scan))
&& !ScanUtil.isAnalyzeTable(scan)) {
- if(actualStartKey!=null) {
- next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
- null);
- if (result.isEmpty() || isDummy(result)) {
- return next;
+ if (ScanUtil.isLocalIndex(scan)) {
+ if (actualStartKey != null) {
+ next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
+ null);
+ if (result.isEmpty() || ScanUtil.isDummy(result)) {
+ return next;
+ }
}
- }
/* In the following, c is only used when data region is null.
dataRegion will never be null in case of non-coprocessor call,
therefore no need to refactor
*/
- IndexUtil.wrapResultUsingOffset(env, result, scan, offset, dataColumns,
- tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+ IndexUtil.wrapResultUsingOffset(env, result, scan, offset, dataColumns,
+ tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+ }
if (extraWhere != null) {
Tuple merged = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
@@ -361,7 +375,7 @@ public abstract class RegionScannerFactory {
if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
byte[] rowKey = CellUtil.cloneRow(result.get(0));
result.clear();
- getDummyResult(rowKey, result);
+ ScanUtil.getDummyResult(rowKey, result);
return true;
}
result.clear();
@@ -373,7 +387,7 @@ public abstract class RegionScannerFactory {
if (result.isEmpty()) {
return next;
}
- if (isDummy(result)) {
+ if (ScanUtil.isDummy(result)) {
return true;
}
firstCell = result.get(0);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 78f153c..33d0430 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -334,6 +334,8 @@ public interface QueryServices extends SQLCloseable {
public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled";
// The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
+ // The number of index rows to be scanned in one RPC call
+ String INDEX_PAGE_SIZE_IN_ROWS = "phoenix.index.page_size_in_rows";
// Flag indicating that server side masking of ttl expired rows is enabled.
public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
// The time limit on the amount of work to be done in one RPC call
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3f24900..379eadd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -340,7 +340,8 @@ public class QueryServicesOptions {
public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
public static final boolean DEFAULT_PHOENIX_SERVER_PAGING_ENABLED = true;
- public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
+ public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32 * 1024;
+ public static final long DEFAULT_INDEX_PAGE_SIZE_IN_ROWS = 32 * 1024;
public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index f1449ff..fee9dba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.util;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
@@ -577,7 +576,21 @@ public class IndexUtil {
whereNode.toSQL(indexResolver, buf);
return QueryUtil.getViewStatement(index.getSchemaName().getString(), index.getTableName().getString(), buf.toString());
}
-
+ public static void addTupleAsOneCell(List<Cell> result,
+ Tuple tuple,
+ TupleProjector tupleProjector,
+ ImmutableBytesWritable ptr) {
+ // This will create a byte[] that captures all of the values from the data table
+ byte[] value =
+ tupleProjector.getSchema().toBytes(tuple, tupleProjector.getExpressions(),
+ tupleProjector.getValueBitSet(), ptr);
+ Cell firstCell = result.get(0);
+ Cell keyValue =
+ PhoenixKeyValueUtil.newKeyValue(firstCell.getRowArray(),
+ firstCell.getRowOffset(),firstCell.getRowLength(), VALUE_COLUMN_FAMILY,
+ VALUE_COLUMN_QUALIFIER, firstCell.getTimestamp(), value, 0, value.length);
+ result.add(keyValue);
+ }
public static void wrapResultUsingOffset(final RegionCoprocessorEnvironment environment,
List<Cell> result, final Scan scan, final int offset, ColumnReference[] dataColumns,
TupleProjector tupleProjector, Region dataRegion, IndexMaintainer indexMaintainer,
@@ -611,13 +624,6 @@ public class IndexUtil {
joinResult = table.get(get);
}
}
- } else if (ScanUtil.isUncoveredGlobalIndex(scan)) {
- byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
- try (Table table = ServerUtil.ConnectionFactory.getConnection(
- ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION, environment).
- getTable(TableName.valueOf(dataTableName))) {
- joinResult = table.get(get);
- }
}
// at this point join result has data from the data table. We now need to take this result and
@@ -625,14 +631,7 @@ public class IndexUtil {
// TODO: handle null case (but shouldn't happen)
Tuple joinTuple = new ResultTuple(joinResult);
// This will create a byte[] that captures all of the values from the data table
- byte[] value =
- tupleProjector.getSchema().toBytes(joinTuple, tupleProjector.getExpressions(),
- tupleProjector.getValueBitSet(), ptr);
- Cell keyValue =
- PhoenixKeyValueUtil.newKeyValue(firstCell.getRowArray(),
- firstCell.getRowOffset(),firstCell.getRowLength(), VALUE_COLUMN_FAMILY,
- VALUE_COLUMN_QUALIFIER, firstCell.getTimestamp(), value, 0, value.length);
- result.add(keyValue);
+ addTupleAsOneCell(result, joinTuple, tupleProjector, ptr);
}
ListIterator<Cell> itr = result.listIterator();