You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2024/03/16 00:28:56 UTC
(phoenix) branch 5.1 updated: PHOENIX-7253 Metadata lookup performance improvement for range scan queries (#1848)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 857398f985 PHOENIX-7253 Metadata lookup performance improvement for range scan queries (#1848)
857398f985 is described below
commit 857398f98506d8a7afd566b8b78807e8db5a85a4
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri Mar 15 17:28:03 2024 -0700
PHOENIX-7253 Metadata lookup performance improvement for range scan queries (#1848)
---
.../end2end/BaseTenantSpecificViewIndexIT.java | 4 +-
.../end2end/ExplainPlanWithStatsDisabledIT.java | 300 +++++++++++++++++++++
.../end2end/TenantSpecificViewIndexSaltedIT.java | 33 ++-
.../phoenix/end2end/salted/SaltedTableIT.java | 32 +++
.../phoenix/compile/ExplainPlanAttributes.java | 21 +-
.../phoenix/iterate/BaseResultIterators.java | 57 +++-
.../iterate/DefaultParallelScanGrouper.java | 11 +
.../iterate/MapReduceParallelScanGrouper.java | 48 +++-
.../phoenix/iterate/ParallelScanGrouper.java | 16 ++
.../phoenix/query/ConnectionQueryServices.java | 15 ++
.../phoenix/query/ConnectionQueryServicesImpl.java | 100 +++++--
.../query/ConnectionlessQueryServicesImpl.java | 16 +-
.../query/DelegateConnectionQueryServices.java | 9 +
.../org/apache/phoenix/query/QueryServices.java | 7 +
.../TestingMapReduceParallelScanGrouper.java | 10 +
.../query/ConnectionQueryServicesImplTest.java | 49 ++--
.../org/apache/phoenix/query/QueryPlanTest.java | 12 +-
17 files changed, 664 insertions(+), 76 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
index 37e3be38e3..fe7bf13d6b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
@@ -154,7 +154,7 @@ public abstract class BaseTenantSpecificViewIndexIT extends SplitSystemCatalogIT
if (saltBuckets == null) {
iteratorTypeAndScanSize = "PARALLEL 1-WAY";
} else {
- iteratorTypeAndScanSize = "PARALLEL 3-WAY";
+ iteratorTypeAndScanSize = "PARALLEL " + saltBuckets + "-WAY";
}
clientSortAlgo = "CLIENT MERGE SORT";
expectedTableName =
@@ -169,7 +169,7 @@ public abstract class BaseTenantSpecificViewIndexIT extends SplitSystemCatalogIT
keyRanges = " [" + (Short.MIN_VALUE + expectedIndexIdOffset)
+ ",'" + tenantId + "','" + valuePrefix + "v2-1']";
} else {
- iteratorTypeAndScanSize = "PARALLEL 3-WAY";
+ iteratorTypeAndScanSize = "PARALLEL " + saltBuckets + "-WAY";
clientSortAlgo = "CLIENT MERGE SORT";
keyRanges = " [0," + (Short.MIN_VALUE + expectedIndexIdOffset)
+ ",'" + tenantId + "','" + valuePrefix + "v2-1'] - ["
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsDisabledIT.java
index 4edd395239..7fe9f6aefa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsDisabledIT.java
@@ -24,12 +24,22 @@ import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.util.List;
+import java.util.Properties;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.ExplainPlanAttributes;
import org.apache.phoenix.end2end.ExplainPlanWithStatsEnabledIT.Estimate;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
/**
* This class has tests for asserting the bytes and rows information exposed in the explain plan
@@ -242,6 +252,296 @@ public class ExplainPlanWithStatsDisabledIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testRangeScanWithMetadataLookup() throws Exception {
+ final String tableName = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(new Properties());
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + "(PK1 VARCHAR NOT NULL, "
+ + "PK2 VARCHAR, COL1 VARCHAR"
+ + " CONSTRAINT pk PRIMARY KEY (PK1, PK2)) SPLIT ON ('b', 'c', 'd')");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('0123A', 'pk20', 'col10')");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('#0123A', 'pk20', 'col10')");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('_0123A', 'pk20', 'col10')");
+ for (int i = 0; i < 25; i++) {
+ String pk1Val = "a" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2a', 'col10a')");
+ pk1Val = "ab" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2ab', 'col10ab')");
+ }
+ for (int i = 0; i < 25; i++) {
+ String pk1Val = "b" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2b', 'col10b')");
+ pk1Val = "bc" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2bc', 'col10bc')");
+ }
+ for (int i = 0; i < 25; i++) {
+ String pk1Val = "c" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2c', 'col10c')");
+ pk1Val = "cd" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2cd', 'col10cd')");
+ }
+ for (int i = 0; i < 25; i++) {
+ String pk1Val = "d" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2d', 'col10d')");
+ pk1Val = "de" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2de', 'col10de')");
+ }
+ for (int i = 0; i < 25; i++) {
+ String pk1Val = "e" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2e', 'col10e')");
+ pk1Val = "ef" + i;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES ('" + pk1Val + "', 'pk2ef', 'col10ef')");
+ }
+ conn.commit();
+
+ String query = "select count(*) from " + tableName
+ + " where PK1 <= 'b'";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(53, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ String queryPlan = QueryUtil.getExplainPlan(rs);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['b']\n"
+ + " SERVER FILTER BY FIRST KEY ONLY\n"
+ + " SERVER AGGREGATE INTO SINGLE ROW",
+ queryPlan);
+ ExplainPlan plan = conn.prepareStatement(query)
+ .unwrap(PhoenixPreparedStatement.class)
+ .optimizeQuery()
+ .getExplainPlan();
+ ExplainPlanAttributes planAttributes = plan.getPlanStepsAsAttributes();
+ assertEquals(2, planAttributes.getNumRegionLocationLookups());
+
+ query = "select count(*) from " + tableName
+ + " where PK1 <= 'cd'";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(128, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ queryPlan = QueryUtil.getExplainPlan(rs);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['cd']\n"
+ + " SERVER FILTER BY FIRST KEY ONLY\n"
+ + " SERVER AGGREGATE INTO SINGLE ROW",
+ queryPlan);
+ plan = conn.prepareStatement(query)
+ .unwrap(PhoenixPreparedStatement.class)
+ .optimizeQuery()
+ .getExplainPlan();
+ planAttributes = plan.getPlanStepsAsAttributes();
+ assertEquals(3, planAttributes.getNumRegionLocationLookups());
+
+ query = "select count(*) from " + tableName
+ + " where PK1 LIKE 'ef%'";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(25, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ queryPlan = QueryUtil.getExplainPlan(rs);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['ef'] - ['eg']\n"
+ + " SERVER FILTER BY FIRST KEY ONLY\n"
+ + " SERVER AGGREGATE INTO SINGLE ROW",
+ queryPlan);
+ plan = conn.prepareStatement(query)
+ .unwrap(PhoenixPreparedStatement.class)
+ .optimizeQuery()
+ .getExplainPlan();
+ planAttributes = plan.getPlanStepsAsAttributes();
+ assertEquals(1, planAttributes.getNumRegionLocationLookups());
+
+ query = "select count(*) from " + tableName
+ + " where PK1 > 'de'";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(75, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ queryPlan = QueryUtil.getExplainPlan(rs);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['de'] - [*]\n"
+ + " SERVER FILTER BY FIRST KEY ONLY\n"
+ + " SERVER AGGREGATE INTO SINGLE ROW",
+ queryPlan);
+ plan = conn.prepareStatement(query)
+ .unwrap(PhoenixPreparedStatement.class)
+ .optimizeQuery()
+ .getExplainPlan();
+ planAttributes = plan.getPlanStepsAsAttributes();
+ assertEquals(1, planAttributes.getNumRegionLocationLookups());
+ }
+ }
+
+ @Test
+ public void testMultiTenantWithMetadataLookup() throws Exception {
+ final String tableName = generateUniqueName();
+ final String view01 = generateUniqueName();
+ final String view02 = generateUniqueName();
+ final String view03 = generateUniqueName();
+ final String view04 = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(new Properties());
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + "(TENANT_ID VARCHAR NOT NULL, "
+ + "PK2 VARCHAR, COL1 VARCHAR"
+ + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2)) MULTI_TENANT = true"
+ + " SPLIT ON ('b', 'c', 'd')");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('0123A', 'pk20', 'col10')");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('#0123A', 'pk20', 'col10')");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('_0123A', 'pk20', 'col10')");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('bcde', 'pk20', 'col10')");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('cdef', 'pk20', 'col10')");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('defg', 'pk20', 'col10')");
+ conn.commit();
+
+ try (Connection tenantConn = getTenantConnection("ab12")) {
+ tenantConn.createStatement().execute("CREATE VIEW " + view01
+ + " (COL2 VARCHAR) AS SELECT * FROM " + tableName);
+ for (int i = 0; i < 25; i++) {
+ String pk2Val = "012" + i;
+ tenantConn.createStatement().execute(
+ "UPSERT INTO " + view01 + "(PK2, COL1, COL2) VALUES ('" + pk2Val
+ + "', 'col101', 'col201')");
+ pk2Val = "ab" + i;
+ tenantConn.createStatement().execute(
+ "UPSERT INTO " + view01 + "(PK2, COL1, COL2) VALUES ('" + pk2Val
+ + "', 'col1010', 'col2010')");
+ }
+ tenantConn.commit();
+ }
+
+ try (Connection tenantConn = getTenantConnection("bc12")) {
+ tenantConn.createStatement().execute("CREATE VIEW " + view02
+ + " (COL2 VARCHAR) AS SELECT * FROM " + tableName);
+ for (int i = 0; i < 25; i++) {
+ String pk2Val = "012" + i;
+ tenantConn.createStatement().execute(
+ "UPSERT INTO " + view02 + "(PK2, COL1, COL2) VALUES ('" + pk2Val
+ + "', 'col101', 'col201')");
+ pk2Val = "ab" + i;
+ tenantConn.createStatement().execute(
+ "UPSERT INTO " + view02 + "(PK2, COL1, COL2) VALUES ('" + pk2Val
+ + "', 'col1010', 'col2010')");
+ }
+ tenantConn.commit();
+ }
+
+ try (Connection tenantConn = getTenantConnection("cd12")) {
+ tenantConn.createStatement().execute("CREATE VIEW " + view03
+ + " (COL2 VARCHAR) AS SELECT * FROM " + tableName);
+ for (int i = 0; i < 25; i++) {
+ String pk2Val = "012" + i;
+ tenantConn.createStatement().execute(
+ "UPSERT INTO " + view03 + "(PK2, COL1, COL2) VALUES ('" + pk2Val
+ + "', 'col101', 'col201')");
+ pk2Val = "ab" + i;
+ tenantConn.createStatement().execute(
+ "UPSERT INTO " + view03 + "(PK2, COL1, COL2) VALUES ('" + pk2Val
+ + "', 'col1010', 'col2010')");
+ }
+ tenantConn.commit();
+ }
+
+ try (Connection tenantConn = getTenantConnection("de12")) {
+ tenantConn.createStatement().execute("CREATE VIEW " + view04
+ + " (COL2 VARCHAR) AS SELECT * FROM " + tableName);
+ for (int i = 0; i < 25; i++) {
+ String pk2Val = "012" + i;
+ tenantConn.createStatement().execute(
+ "UPSERT INTO " + view04 + "(PK2, COL1, COL2) VALUES ('" + pk2Val
+ + "', 'col101', 'col201')");
+ pk2Val = "ab" + i;
+ tenantConn.createStatement().execute(
+ "UPSERT INTO " + view04 + "(PK2, COL1, COL2) VALUES ('" + pk2Val
+ + "', 'col1010', 'col2010')");
+ }
+ tenantConn.commit();
+ }
+
+ try (Connection tenantConn = getTenantConnection("ab12")) {
+ String query = "select count(*) from " + view01;
+ ResultSet rs = tenantConn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(50, rs.getInt(1));
+
+ rs = tenantConn.createStatement().executeQuery("EXPLAIN " + query);
+ String queryPlan = QueryUtil.getExplainPlan(rs);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['ab12']\n"
+ + " SERVER FILTER BY FIRST KEY ONLY\n"
+ + " SERVER AGGREGATE INTO SINGLE ROW",
+ queryPlan);
+ ExplainPlan plan = tenantConn.prepareStatement(query)
+ .unwrap(PhoenixPreparedStatement.class)
+ .optimizeQuery()
+ .getExplainPlan();
+ ExplainPlanAttributes planAttributes = plan.getPlanStepsAsAttributes();
+ assertEquals(1, planAttributes.getNumRegionLocationLookups());
+ }
+
+ try (Connection tenantConn = getTenantConnection("cd12")) {
+ String query = "select * from " + view03 + " order by col2";
+
+ ResultSet rs = tenantConn.createStatement().executeQuery("EXPLAIN " + query);
+ String queryPlan = QueryUtil.getExplainPlan(rs);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['cd12']\n"
+ + " SERVER SORTED BY [COL2]\n"
+ + "CLIENT MERGE SORT",
+ queryPlan);
+ ExplainPlan plan = tenantConn.prepareStatement(query)
+ .unwrap(PhoenixPreparedStatement.class)
+ .optimizeQuery()
+ .getExplainPlan();
+ ExplainPlanAttributes planAttributes = plan.getPlanStepsAsAttributes();
+ assertEquals(1, planAttributes.getNumRegionLocationLookups());
+ }
+
+ try (Connection tenantConn = getTenantConnection("de12")) {
+ String query = "select * from " + view04 + " where col1='col101'";
+ ResultSet rs = tenantConn.createStatement().executeQuery(query);
+ int c = 0;
+ while (rs.next()) {
+ c++;
+ }
+ assertEquals(25, c);
+
+ rs = tenantConn.createStatement().executeQuery("EXPLAIN " + query);
+ String queryPlan = QueryUtil.getExplainPlan(rs);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['de12']\n"
+ + " SERVER FILTER BY COL1 = 'col101'",
+ queryPlan);
+ ExplainPlan plan = tenantConn.prepareStatement(query)
+ .unwrap(PhoenixPreparedStatement.class)
+ .optimizeQuery()
+ .getExplainPlan();
+ ExplainPlanAttributes planAttributes = plan.getPlanStepsAsAttributes();
+ assertEquals(1, planAttributes.getNumRegionLocationLookups());
+ }
+ }
+ }
+
+ private Connection getTenantConnection(final String tenantId) throws Exception {
+ Properties tenantProps = new Properties();
+ tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ return DriverManager.getConnection(getUrl(), tenantProps);
+ }
+
public static void assertEstimatesAreNull(String sql, List<Object> binds, Connection conn)
throws Exception {
Estimate info = getByteRowEstimates(conn, sql, binds);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificViewIndexSaltedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificViewIndexSaltedIT.java
index 7780adba6f..f6a123b92e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificViewIndexSaltedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificViewIndexSaltedIT.java
@@ -17,31 +17,46 @@
*/
package org.apache.phoenix.end2end;
+import java.util.Arrays;
+import java.util.Collection;
+
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
public class TenantSpecificViewIndexSaltedIT extends BaseTenantSpecificViewIndexIT {
- private static final Integer SALT_BUCKETS = 3;
-
+
+ private final Integer saltBuckets;
+
+ public TenantSpecificViewIndexSaltedIT(Integer saltBuckets) {
+ this.saltBuckets = saltBuckets;
+ }
+
+ @Parameterized.Parameters(name = "TenantSpecificViewIndexSaltedIT_SaltBuckets={0}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {{3}, {13}, {39}});
+ }
+
@Test
public void testUpdatableSaltedView() throws Exception {
- testUpdatableView(SALT_BUCKETS);
+ testUpdatableView(saltBuckets);
}
-
+
@Test
public void testUpdatableViewsWithSameNameDifferentTenants() throws Exception {
- testUpdatableViewsWithSameNameDifferentTenants(SALT_BUCKETS);
+ testUpdatableViewsWithSameNameDifferentTenants(saltBuckets);
}
-
+
@Test
public void testUpdatableSaltedViewWithLocalIndex() throws Exception {
- testUpdatableView(SALT_BUCKETS, true);
+ testUpdatableView(saltBuckets, true);
}
@Test
public void testUpdatableViewsWithSameNameDifferentTenantsWithLocalIndex() throws Exception {
- testUpdatableViewsWithSameNameDifferentTenants(SALT_BUCKETS, true);
+ testUpdatableViewsWithSameNameDifferentTenants(saltBuckets, true);
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
index 8b1bdf99aa..160f90cd0a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java
@@ -91,6 +91,38 @@ public class SaltedTableIT extends BaseSaltedTableIT {
}
}
+ @Test
+ public void testPointLookupOnSaltedTable2() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String tableName = generateUniqueName();
+ String query =
+ "CREATE TABLE " + tableName + " (A integer not null, B integer "
+ + "CONSTRAINT pk PRIMARY KEY (A)) SALT_BUCKETS = 10";
+ conn.createStatement().execute(query);
+ PreparedStatement stmt =
+ conn.prepareStatement("UPSERT INTO " + tableName + " values(?, ?)");
+ for (int i = 0; i < 1000; i++) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, i + 10);
+ stmt.execute();
+ }
+ conn.commit();
+ for (int i = 0; i < 1000; i++) {
+ query = "SELECT * FROM " + tableName + " WHERE A = " + i;
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(i, rs.getInt("A"));
+ assertEquals(i + 10, rs.getInt("B"));
+ assertFalse(rs.next());
+ query = "explain " + query;
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(QueryUtil.getExplainPlan(rs)
+ .contains("CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER"));
+ }
+ }
+ }
+
@Test
public void testTableWithSplit() throws Exception {
try {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlanAttributes.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlanAttributes.java
index a41f776345..d6c4d0398a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlanAttributes.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlanAttributes.java
@@ -76,6 +76,7 @@ public class ExplainPlanAttributes {
private final ExplainPlanAttributes rhsJoinQueryExplainPlan;
private final Set<PColumn> serverMergeColumns;
private final List<HRegionLocation> regionLocations;
+ private final int numRegionLocationLookups;
private static final ExplainPlanAttributes EXPLAIN_PLAN_INSTANCE =
new ExplainPlanAttributes();
@@ -116,6 +117,7 @@ public class ExplainPlanAttributes {
this.rhsJoinQueryExplainPlan = null;
this.serverMergeColumns = null;
this.regionLocations = null;
+ this.numRegionLocationLookups = 0;
}
public ExplainPlanAttributes(String abstractExplainPlan,
@@ -135,8 +137,8 @@ public class ExplainPlanAttributes {
Integer clientOffset, Integer clientRowLimit,
Integer clientSequenceCount, String clientCursorName,
String clientSortAlgo,
- ExplainPlanAttributes rhsJoinQueryExplainPlan,
- Set<PColumn> serverMergeColumns, List<HRegionLocation> regionLocations) {
+ ExplainPlanAttributes rhsJoinQueryExplainPlan, Set<PColumn> serverMergeColumns,
+ List<HRegionLocation> regionLocations, int numRegionLocationLookups) {
this.abstractExplainPlan = abstractExplainPlan;
this.splitsChunk = splitsChunk;
this.estimatedRows = estimatedRows;
@@ -172,6 +174,7 @@ public class ExplainPlanAttributes {
this.rhsJoinQueryExplainPlan = rhsJoinQueryExplainPlan;
this.serverMergeColumns = serverMergeColumns;
this.regionLocations = regionLocations;
+ this.numRegionLocationLookups = numRegionLocationLookups;
}
public String getAbstractExplainPlan() {
@@ -314,6 +317,10 @@ public class ExplainPlanAttributes {
return regionLocations;
}
+ public int getNumRegionLocationLookups() {
+ return numRegionLocationLookups;
+ }
+
public static ExplainPlanAttributes getDefaultExplainPlan() {
return EXPLAIN_PLAN_INSTANCE;
}
@@ -354,6 +361,7 @@ public class ExplainPlanAttributes {
private ExplainPlanAttributes rhsJoinQueryExplainPlan;
private Set<PColumn> serverMergeColumns;
private List<HRegionLocation> regionLocations;
+ private int numRegionLocationLookups;
public ExplainPlanAttributesBuilder() {
// default
@@ -407,6 +415,7 @@ public class ExplainPlanAttributes {
explainPlanAttributes.getRhsJoinQueryExplainPlan();
this.serverMergeColumns = explainPlanAttributes.getServerMergeColumns();
this.regionLocations = explainPlanAttributes.getRegionLocations();
+ this.numRegionLocationLookups = explainPlanAttributes.getNumRegionLocationLookups();
}
public ExplainPlanAttributesBuilder setAbstractExplainPlan(
@@ -616,6 +625,12 @@ public class ExplainPlanAttributes {
return this;
}
+ public ExplainPlanAttributesBuilder setNumRegionLocationLookups(
+ int numRegionLocationLookups) {
+ this.numRegionLocationLookups = numRegionLocationLookups;
+ return this;
+ }
+
public ExplainPlanAttributes build() {
return new ExplainPlanAttributes(abstractExplainPlan, splitsChunk,
estimatedRows, estimatedSizeInBytes, iteratorTypeAndScanSize,
@@ -629,7 +644,7 @@ public class ExplainPlanAttributes {
clientAfterAggregate, clientDistinctFilter, clientOffset,
clientRowLimit, clientSequenceCount, clientCursorName,
clientSortAlgo, rhsJoinQueryExplainPlan, serverMergeColumns,
- regionLocations);
+ regionLocations, numRegionLocationLookups);
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index fd4cd46911..721b123418 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -172,6 +172,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
protected Map<ImmutableBytesPtr,ServerCache> caches;
private final QueryPlan dataPlan;
private static boolean forTestingSetTimeoutToMaxToLetQueryPassHere = false;
+ private int numRegionLocationLookups = 0;
static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
@Override
@@ -581,9 +582,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return scans;
}
- private List<HRegionLocation> getRegionBoundaries(ParallelScanGrouper scanGrouper)
- throws SQLException{
- return scanGrouper.getRegionBoundaries(context, physicalTableName);
+ private List<HRegionLocation> getRegionBoundaries(ParallelScanGrouper scanGrouper,
+ byte[] startRegionBoundaryKey, byte[] stopRegionBoundaryKey) throws SQLException {
+ return scanGrouper.getRegionBoundaries(context, physicalTableName, startRegionBoundaryKey,
+ stopRegionBoundaryKey);
}
private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
@@ -677,7 +679,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
* @throws SQLException
*/
private ScansWithRegionLocations getParallelScans(Scan scan) throws SQLException {
- List<HRegionLocation> regionLocations = getRegionBoundaries(scanGrouper);
+ List<HRegionLocation> regionLocations =
+ getRegionBoundaries(scanGrouper, scan.getStartRow(), scan.getStopRow());
+ numRegionLocationLookups = regionLocations.size();
List<byte[]> regionBoundaries = toBoundaries(regionLocations);
int regionIndex = 0;
int stopIndex = regionBoundaries.size();
@@ -932,8 +936,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
SchemaUtil.processSplit(new byte[] { 0 }, table.getPKColumns());
byte[] splitPostfix =
Arrays.copyOfRange(sampleProcessedSaltByte, 1, sampleProcessedSaltByte.length);
- List<HRegionLocation> regionLocations = getRegionBoundaries(scanGrouper);
- List<byte[]> regionBoundaries = toBoundaries(regionLocations);
boolean isSalted = table.getBucketNum() != null;
GuidePostsInfo gps = getGuidePosts();
// case when stats wasn't collected
@@ -974,6 +976,44 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
int regionIndex = 0;
int startRegionIndex = 0;
+
+ List<HRegionLocation> regionLocations;
+ if (isSalted && !isLocalIndex) {
+ // key prefix = salt num + view index id + tenant id
+ // If salting is used with tenant or view index id, scan start and end
+ // rowkeys will not be empty. We need to generate region locations for
+ // all the scan range such that we cover (each salt bucket num) + (prefix starting from
+ // index position 1 to cover view index and/or tenant id and/or remaining prefix).
+ if (scan.getStartRow().length > 0 && scan.getStopRow().length > 0) {
+ regionLocations = new ArrayList<>();
+ for (int i = 0; i < getTable().getBucketNum(); i++) {
+ byte[] saltStartRegionKey = new byte[scan.getStartRow().length];
+ saltStartRegionKey[0] = (byte) i;
+ System.arraycopy(scan.getStartRow(), 1, saltStartRegionKey, 1,
+ scan.getStartRow().length - 1);
+
+ byte[] saltStopRegionKey = new byte[scan.getStopRow().length];
+ saltStopRegionKey[0] = (byte) i;
+ System.arraycopy(scan.getStopRow(), 1, saltStopRegionKey, 1,
+ scan.getStopRow().length - 1);
+
+ regionLocations.addAll(
+ getRegionBoundaries(scanGrouper, saltStartRegionKey, saltStopRegionKey));
+ }
+ } else {
+ // If scan start and end rowkeys are empty, we end up fetching all region locations.
+ regionLocations =
+ getRegionBoundaries(scanGrouper, startRegionBoundaryKey, stopRegionBoundaryKey);
+ }
+ } else {
+ // For range scans, startRegionBoundaryKey and stopRegionBoundaryKey should refer
+ // to the boundary specified by the scan context.
+ regionLocations =
+ getRegionBoundaries(scanGrouper, startRegionBoundaryKey, stopRegionBoundaryKey);
+ }
+
+ numRegionLocationLookups = regionLocations.size();
+ List<byte[]> regionBoundaries = toBoundaries(regionLocations);
int stopIndex = regionBoundaries.size();
if (startRegionBoundaryKey.length > 0) {
startRegionIndex = regionIndex = getIndexContainingInclusive(regionBoundaries, startRegionBoundaryKey);
@@ -1618,6 +1658,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return this.scans.size();
}
+ public int getNumRegionLocationLookups() {
+ return this.numRegionLocationLookups;
+ }
+
@Override
public void explain(List<String> planSteps) {
explainUtil(planSteps, null);
@@ -1662,6 +1706,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
if (explainPlanAttributesBuilder != null) {
explainPlanAttributesBuilder.setIteratorTypeAndScanSize(
iteratorTypeAndScanSize);
+ explainPlanAttributesBuilder.setNumRegionLocationLookups(getNumRegionLocationLookups());
}
if (this.plan.getStatement().getTableSamplingRate() != null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
index 2064619b1e..5252be410e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
@@ -73,4 +73,15 @@ public class DefaultParallelScanGrouper implements ParallelScanGrouper {
throws SQLException {
return context.getConnection().getQueryServices().getAllTableRegions(tableName);
}
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getRegionBoundaries(StatementContext context,
+ byte[] tableName, byte[] startRegionBoundaryKey, byte[] stopRegionBoundaryKey)
+ throws SQLException {
+ return context.getConnection().getQueryServices()
+ .getTableRegions(tableName, startRegionBoundaryKey, stopRegionBoundaryKey);
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
index 088432ae53..511467e693 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -65,24 +65,44 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
public List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName) throws SQLException {
String snapshotName;
Configuration conf = context.getConnection().getQueryServices().getConfiguration();
- if((snapshotName = getSnapshotName(conf)) != null) {
- try {
- Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
- FileSystem fs = rootDir.getFileSystem(conf);
- Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
- SnapshotDescription snapshotDescription = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
- SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDescription);
- return getRegionLocationsFromManifest(manifest);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- else {
+ if ((snapshotName = getSnapshotName(conf)) != null) {
+ return getRegionLocationsFromSnapshot(conf, snapshotName);
+ } else {
return context.getConnection().getQueryServices().getAllTableRegions(tableName);
}
}
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName,
+ byte[] startRegionBoundaryKey, byte[] stopRegionBoundaryKey) throws SQLException {
+ String snapshotName;
+ Configuration conf = context.getConnection().getQueryServices().getConfiguration();
+ if ((snapshotName = getSnapshotName(conf)) != null) {
+ return getRegionLocationsFromSnapshot(conf, snapshotName);
+ } else {
+ return context.getConnection().getQueryServices()
+ .getTableRegions(tableName, startRegionBoundaryKey, stopRegionBoundaryKey);
+ }
+ }
+
+ private List<HRegionLocation> getRegionLocationsFromSnapshot(Configuration conf,
+ String snapshotName) {
+ try {
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+ SnapshotDescription snapshotDescription =
+ SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+ SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDescription);
+ return getRegionLocationsFromManifest(manifest);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Get list of region locations from SnapshotManifest
* BaseResultIterators assume that regions are sorted using RegionInfo.COMPARATOR
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
index e53bc0a3b5..2b9d81e095 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
@@ -43,4 +43,20 @@ public interface ParallelScanGrouper {
List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName) throws SQLException;
+ /**
+ * Retrieve table region locations that cover the startRegionBoundaryKey and
+ * stopRegionBoundaryKey. The start key of the first region of the returned list must be less
+ * than or equal to startRegionBoundaryKey. The end key of the last region of the returned list
+ * must be greater than or equal to stopRegionBoundaryKey.
+ *
+ * @param context Statement Context.
+ * @param tableName Table name.
+ * @param startRegionBoundaryKey Start region boundary key.
+ * @param stopRegionBoundaryKey Stop region boundary key.
+ * @return The list of region locations that cover the startRegionBoundaryKey and
+ * stopRegionBoundaryKey key boundary.
+ * @throws SQLException If fails to retrieve region locations.
+ */
+ List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName,
+ byte[] startRegionBoundaryKey, byte[] stopRegionBoundaryKey) throws SQLException;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index c6a3df2bc8..cb56bd47a7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -99,6 +99,21 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException;
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException;
+ /**
+ * Retrieve table region locations that cover the startRowKey and endRowKey. The start key
+ * of the first region of the returned list must be less than or equal to startRowKey.
+ * The end key of the last region of the returned list must be greater than or equal to
+ * endRowKey.
+ *
+ * @param tableName Table name.
+ * @param startRowKey Start RowKey.
+ * @param endRowKey End RowKey.
+ * @return The list of region locations that cover the startRowKey and endRowKey key boundary.
+ * @throws SQLException If fails to retrieve region locations.
+ */
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey,
+ byte[] endRowKey) throws SQLException;
+
public PhoenixConnection connect(String url, Properties info) throws SQLException;
/**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 8983ae7b0e..dbeaad4f8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -63,9 +63,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
@@ -654,54 +652,112 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
((ClusterConnection)connection).clearRegionCache(tableName);
}
- public byte[] getNextRegionStartKey(HRegionLocation regionLocation, byte[] currentKey) throws IOException {
+ public byte[] getNextRegionStartKey(HRegionLocation regionLocation, byte[] currentKey,
+ HRegionLocation prevRegionLocation) throws IOException {
// in order to check the overlap/inconsistencies bad region info, we have to make sure
// the current endKey always increasing(compare the previous endKey)
- // note :- currentKey is the previous regions endKey
- if ((Bytes.compareTo(regionLocation.getRegionInfo().getStartKey(), currentKey) != 0
- || Bytes.compareTo(regionLocation.getRegionInfo().getEndKey(), currentKey) <= 0)
+
+ // conditionOne = true if the currentKey does not belong to the region boundaries specified
+ // by regionLocation i.e. if the currentKey is less than the region startKey or if the
+ // currentKey is greater than or equal to the region endKey.
+
+ // conditionTwo = true if the previous region endKey is either not same as current region
+ // startKey or if the previous region endKey is greater than or equal to current region
+ // endKey.
+ boolean conditionOne =
+ (Bytes.compareTo(regionLocation.getRegion().getStartKey(), currentKey) > 0
+ || Bytes.compareTo(regionLocation.getRegion().getEndKey(), currentKey) <= 0)
&& !Bytes.equals(currentKey, HConstants.EMPTY_START_ROW)
- && !Bytes.equals(regionLocation.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
+ && !Bytes.equals(regionLocation.getRegion().getEndKey(), HConstants.EMPTY_END_ROW);
+ boolean conditionTwo = prevRegionLocation != null && (
+ Bytes.compareTo(regionLocation.getRegion().getStartKey(),
+ prevRegionLocation.getRegion().getEndKey()) != 0 ||
+ Bytes.compareTo(regionLocation.getRegion().getEndKey(),
+ prevRegionLocation.getRegion().getEndKey()) <= 0)
+ && !Bytes.equals(prevRegionLocation.getRegion().getEndKey(), HConstants.EMPTY_START_ROW)
+ && !Bytes.equals(regionLocation.getRegion().getEndKey(), HConstants.EMPTY_END_ROW);
+ if (conditionOne || conditionTwo) {
String regionNameString =
- new String(regionLocation.getRegionInfo().getRegionName(), StandardCharsets.UTF_8);
- throw new IOException(String.format(
- "HBase region information overlap/inconsistencies on region %s", regionNameString));
- }
- return regionLocation.getRegionInfo().getEndKey();
+ new String(regionLocation.getRegion().getRegionName(), StandardCharsets.UTF_8);
+ LOGGER.error(
+ "HBase region overlap/inconsistencies on {} , current key: {} , region startKey:"
+ + " {} , region endKey: {} , prev region startKey: {} , prev region endKey: {}",
+ regionLocation,
+ Bytes.toStringBinary(currentKey),
+ Bytes.toStringBinary(regionLocation.getRegion().getStartKey()),
+ Bytes.toStringBinary(regionLocation.getRegion().getEndKey()),
+ prevRegionLocation == null ?
+ "null" : Bytes.toStringBinary(prevRegionLocation.getRegion().getStartKey()),
+ prevRegionLocation == null ?
+ "null" : Bytes.toStringBinary(prevRegionLocation.getRegion().getEndKey()));
+ throw new IOException(
+ String.format("HBase region information overlap/inconsistencies on region %s",
+ regionNameString));
+ }
+ return regionLocation.getRegion().getEndKey();
}
@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
+ return getTableRegions(tableName, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey,
+ byte[] endRowKey) throws SQLException {
/*
* Use HConnection.getRegionLocation as it uses the cache in HConnection, while getting
* all region locations from the HTable doesn't.
*/
- int retryCount = 0, maxRetryCount = 1;
+ int retryCount = 0;
+ int maxRetryCount =
+ config.getInt(PHOENIX_GET_REGIONS_RETRIES, DEFAULT_PHOENIX_GET_REGIONS_RETRIES);
TableName table = TableName.valueOf(tableName);
+ byte[] currentKey = null;
+ HRegionLocation prevRegionLocation = null;
while (true) {
try {
// We could surface the package projected HConnectionImplementation.getNumberOfCachedRegionLocations
// to get the sizing info we need, but this would require a new class in the same package and a cast
// to this implementation class, so it's probably not worth it.
List<HRegionLocation> locations = Lists.newArrayList();
- byte[] currentKey = HConstants.EMPTY_START_ROW;
+ currentKey = startRowKey;
do {
- HRegionLocation regionLocation = ((ClusterConnection)connection).getRegionLocation(
- table, currentKey, false);
- currentKey = getNextRegionStartKey(regionLocation, currentKey);
+ HRegionLocation regionLocation =
+ ((ClusterConnection) connection).getRegionLocation(table,
+ currentKey, false);
+ currentKey =
+ getNextRegionStartKey(regionLocation, currentKey, prevRegionLocation);
locations.add(regionLocation);
+ prevRegionLocation = regionLocation;
+ if (!Bytes.equals(endRowKey, HConstants.EMPTY_END_ROW)
+ && Bytes.compareTo(currentKey, endRowKey) >= 0) {
+ break;
+ }
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW));
return locations;
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
throw new TableNotFoundException(table.getNameAsString());
} catch (IOException e) {
LOGGER.error("Exception encountered in getAllTableRegions for "
- + "table: {}, retryCount: {}", table.getNameAsString(), retryCount, e);
- if (retryCount++ < maxRetryCount) { // One retry, in case split occurs while navigating
+ + "table: {}, retryCount: {} , currentKey: {} , startRowKey: {} ,"
+ + " endRowKey: {}",
+ table.getNameAsString(),
+ retryCount,
+ Bytes.toStringBinary(currentKey),
+ Bytes.toStringBinary(startRowKey),
+ Bytes.toStringBinary(endRowKey),
+ e);
+ if (retryCount++ < maxRetryCount) {
continue;
}
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL)
- .setRootCause(e).build().buildException();
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.GET_TABLE_REGIONS_FAIL).setRootCause(e).build()
+ .buildException();
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 68f1376eeb..f215786f2e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -226,14 +226,24 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
@Override
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
+ return getTableRegions(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey,
+ byte[] endRowKey) throws SQLException {
List<HRegionLocation> regions = tableSplits.get(Bytes.toString(tableName));
if (regions != null) {
return regions;
}
RegionInfo hri =
- RegionInfoBuilder.newBuilder(TableName.valueOf(tableName))
- .setStartKey(HConstants.EMPTY_START_ROW)
- .setStartKey(HConstants.EMPTY_END_ROW).build();
+ RegionInfoBuilder.newBuilder(TableName.valueOf(tableName))
+ .setStartKey(startRowKey)
+ .setStartKey(endRowKey)
+ .build();
return Collections.singletonList(new HRegionLocation(hri, SERVER_NAME, -1));
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 5d576f19af..302a1203f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -86,6 +86,15 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
return getDelegate().getAllTableRegions(tableName);
}
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey,
+ byte[] endRowKey) throws SQLException {
+ return getDelegate().getTableRegions(tableName, startRowKey, endRowKey);
+ }
+
@Override
public void addTable(PTable table, long resolvedTime) throws SQLException {
getDelegate().addTable(table, resolvedTime);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 38ee38e4fe..c01edfbbc3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -430,6 +430,13 @@ public interface QueryServices extends SQLCloseable {
String PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE =
"phoenix.paging.start.newscan.startrow.include";
+ /**
+ * Num of retries while retrieving the region location details for the given table.
+ */
+ String PHOENIX_GET_REGIONS_RETRIES = "phoenix.get.table.regions.retries";
+
+ int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 3;
+
/**
* Get executor service used for parallel scans
*/
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java
index 77b3a7efbb..27e7c67e0a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java
@@ -44,6 +44,16 @@ public class TestingMapReduceParallelScanGrouper extends MapReduceParallelScanGr
return regionLocations;
}
+ @Override
+ public List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName,
+ byte[] startRegionBoundaryKey, byte[] stopRegionBoundaryKey) throws SQLException {
+ List<HRegionLocation> regionLocations =
+ super.getRegionBoundaries(context, tableName, startRegionBoundaryKey,
+ stopRegionBoundaryKey);
+ numCallsToGetRegionBoundaries.incrementAndGet();
+ return regionLocations;
+ }
+
public static int getNumCallsToGetRegionBoundaries() {
return numCallsToGetRegionBoundaries.get();
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
index fc2d980423..078fc8d8e2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
@@ -49,8 +49,8 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@@ -161,9 +161,12 @@ public class ConnectionQueryServicesImplTest {
@Test
public void testGetNextRegionStartKey() {
- HRegionInfo mockHRegionInfo = org.mockito.Mockito.mock(HRegionInfo.class);
+ RegionInfo mockHRegionInfo = org.mockito.Mockito.mock(RegionInfo.class);
+ RegionInfo mockPrevHRegionInfo = org.mockito.Mockito.mock(RegionInfo.class);
HRegionLocation mockRegionLocation = org.mockito.Mockito.mock(HRegionLocation.class);
- ConnectionQueryServicesImpl mockCqsi = org.mockito.Mockito.mock(ConnectionQueryServicesImpl.class,
+ HRegionLocation mockPrevRegionLocation = org.mockito.Mockito.mock(HRegionLocation.class);
+ ConnectionQueryServicesImpl mockCqsi =
+ org.mockito.Mockito.mock(ConnectionQueryServicesImpl.class,
org.mockito.Mockito.CALLS_REAL_METHODS);
byte[] corruptedStartAndEndKey = "0x3000".getBytes();
byte[] corruptedDecreasingKey = "0x2999".getBytes();
@@ -172,20 +175,26 @@ public class ConnectionQueryServicesImplTest {
byte[] notCorruptedEndKey = "0x3000".getBytes();
byte[] notCorruptedNewKey = "0x3001".getBytes();
byte[] mockTableName = "dummyTable".getBytes();
- when(mockRegionLocation.getRegionInfo()).thenReturn(mockHRegionInfo);
+ when(mockRegionLocation.getRegion()).thenReturn(mockHRegionInfo);
when(mockHRegionInfo.getRegionName()).thenReturn(mockTableName);
+ when(mockPrevRegionLocation.getRegion()).thenReturn(mockPrevHRegionInfo);
+ when(mockPrevHRegionInfo.getRegionName()).thenReturn(mockTableName);
// comparing the current regionInfo endKey is equal to the previous endKey
// [0x3000, Ox3000) vs 0x3000
when(mockHRegionInfo.getStartKey()).thenReturn(corruptedStartAndEndKey);
when(mockHRegionInfo.getEndKey()).thenReturn(corruptedStartAndEndKey);
- testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedStartAndEndKey, true);
+ when(mockPrevHRegionInfo.getEndKey()).thenReturn(corruptedStartAndEndKey);
+ testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedStartAndEndKey, true,
+ mockPrevRegionLocation);
// comparing the current regionInfo endKey is less than previous endKey
// [0x3000,0x2999) vs 0x3000
when(mockHRegionInfo.getStartKey()).thenReturn(corruptedStartAndEndKey);
when(mockHRegionInfo.getEndKey()).thenReturn(corruptedDecreasingKey);
- testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedStartAndEndKey, true);
+ when(mockPrevHRegionInfo.getEndKey()).thenReturn(corruptedStartAndEndKey);
+ testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedStartAndEndKey, true,
+ mockPrevRegionLocation);
// comparing the current regionInfo endKey is greater than the previous endKey
// [0x3000,0x3000) vs 0x3001
@@ -193,41 +202,51 @@ public class ConnectionQueryServicesImplTest {
// [0x2999,0x3001) vs 0x3000
when(mockHRegionInfo.getStartKey()).thenReturn(corruptedDecreasingKey);
when(mockHRegionInfo.getEndKey()).thenReturn(corruptedNewEndKey);
- testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedStartAndEndKey, true);
+ when(mockPrevHRegionInfo.getEndKey()).thenReturn(corruptedStartAndEndKey);
+ testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedStartAndEndKey, true,
+ mockPrevRegionLocation);
// comparing the current regionInfo startKey is greater than the previous endKey leading to a hole
// [0x3000,0x3001) vs 0x2999
when(mockHRegionInfo.getStartKey()).thenReturn(corruptedStartAndEndKey);
when(mockHRegionInfo.getEndKey()).thenReturn(corruptedNewEndKey);
- testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedDecreasingKey, true);
+ when(mockPrevHRegionInfo.getEndKey()).thenReturn(corruptedDecreasingKey);
+ testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedDecreasingKey, true,
+ mockPrevRegionLocation);
// comparing the current regionInfo startKey is less than the previous endKey leading to an overlap
- // [0x2999,0x3001) vs 0x3000
+ // [0x2999,0x3001) vs 0x3000.
when(mockHRegionInfo.getStartKey()).thenReturn(corruptedDecreasingKey);
when(mockHRegionInfo.getEndKey()).thenReturn(corruptedNewEndKey);
- testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedStartAndEndKey, true);
+ when(mockPrevHRegionInfo.getEndKey()).thenReturn(corruptedStartAndEndKey);
+ testGetNextRegionStartKey(mockCqsi, mockRegionLocation, corruptedStartAndEndKey, true,
+ mockPrevRegionLocation);
// comparing the current regionInfo startKey is equal to the previous endKey
// [0x3000,0x3001) vs 0x3000
when(mockHRegionInfo.getStartKey()).thenReturn(corruptedStartAndEndKey);
when(mockHRegionInfo.getEndKey()).thenReturn(notCorruptedNewKey);
- testGetNextRegionStartKey(mockCqsi, mockRegionLocation, notCorruptedEndKey, false);
+ when(mockPrevHRegionInfo.getEndKey()).thenReturn(notCorruptedEndKey);
+ testGetNextRegionStartKey(mockCqsi, mockRegionLocation, notCorruptedEndKey, false,
+ mockPrevRegionLocation);
// test EMPTY_START_ROW
when(mockHRegionInfo.getStartKey()).thenReturn(HConstants.EMPTY_START_ROW);
when(mockHRegionInfo.getEndKey()).thenReturn(notCorruptedEndKey);
- testGetNextRegionStartKey(mockCqsi, mockRegionLocation, HConstants.EMPTY_START_ROW, false);
+ testGetNextRegionStartKey(mockCqsi, mockRegionLocation, HConstants.EMPTY_START_ROW, false,
+ null);
//test EMPTY_END_ROW
when(mockHRegionInfo.getStartKey()).thenReturn(notCorruptedStartKey);
when(mockHRegionInfo.getEndKey()).thenReturn(HConstants.EMPTY_END_ROW);
- testGetNextRegionStartKey(mockCqsi, mockRegionLocation, notCorruptedStartKey, false);
+ testGetNextRegionStartKey(mockCqsi, mockRegionLocation, notCorruptedStartKey, false, null);
}
private void testGetNextRegionStartKey(ConnectionQueryServicesImpl mockCqsi,
- HRegionLocation mockRegionLocation, byte[] key, boolean isCorrupted) {
+ HRegionLocation mockRegionLocation, byte[] key, boolean isCorrupted,
+ HRegionLocation mockPrevRegionLocation) {
try {
- mockCqsi.getNextRegionStartKey(mockRegionLocation, key);
+ mockCqsi.getNextRegionStartKey(mockRegionLocation, key, mockPrevRegionLocation);
if (isCorrupted) {
fail();
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
index d87989c93e..9e043b171f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
@@ -247,8 +247,12 @@ public class QueryPlanTest extends BaseConnectionlessQueryTest {
String query = "select * from foo where a = 'a' and b >= timestamp '2016-01-28 00:00:00' and b < timestamp '2016-01-29 00:00:00'";
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
String queryPlan = QueryUtil.getExplainPlan(rs);
+ // For real connection CQSI, the result is supposed to be 20-WAY RANGE SCAN, however
+ // for connection-less impl, since we retrieve region locations for 20 splits and each
+ // time we get all region locations due to connection-less specific impl, we get
+ // 20*20 = 400-WAY RANGE SCAN.
assertEquals(
- "CLIENT PARALLEL 20-WAY RANGE SCAN OVER FOO [0,'a',~'2016-01-28 23:59:59.999'] - [19,'a',~'2016-01-28 00:00:00.000']\n" +
+ "CLIENT PARALLEL 400-WAY RANGE SCAN OVER FOO [0,'a',~'2016-01-28 23:59:59.999'] - [19,'a',~'2016-01-28 00:00:00.000']\n" +
" SERVER FILTER BY FIRST KEY ONLY\n" +
"CLIENT MERGE SORT", queryPlan);
} finally {
@@ -273,8 +277,12 @@ public class QueryPlanTest extends BaseConnectionlessQueryTest {
String query = "select * from " + tableName + " where a = 'a' and b >= timestamp '2016-01-28 00:00:00' and b < timestamp '2016-01-29 00:00:00'";
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
String queryPlan = QueryUtil.getExplainPlan(rs);
+ // For real connection CQSI, the result is supposed to be 20-WAY RANGE SCAN, however
+ // for connection-less impl, since we retrieve region locations for 20 splits and each
+ // time we get all region locations due to connection-less specific impl, we get
+ // 20*20 = 400-WAY RANGE SCAN.
assertEquals(
- "CLIENT PARALLEL 20-WAY ROUND ROBIN RANGE SCAN OVER " + tableName + " [0,'a',~'2016-01-28 23:59:59.999'] - [19,'a',~'2016-01-28 00:00:00.000']\n" +
+ "CLIENT PARALLEL 400-WAY ROUND ROBIN RANGE SCAN OVER " + tableName + " [0,'a',~'2016-01-28 23:59:59.999'] - [19,'a',~'2016-01-28 00:00:00.000']\n" +
" SERVER FILTER BY FIRST KEY ONLY", queryPlan);
} finally {
conn.close();