You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/07/21 04:32:26 UTC
[phoenix] branch master updated: PHOENIX-6910 Scans created during query compilation and execution against salted tables need to be more resilient
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new bbc915437e PHOENIX-6910 Scans created during query compilation and execution against salted tables need to be more resilient
bbc915437e is described below
commit bbc915437e026fc415c20fa06136044b257ce919
Author: Istvan Toth <st...@apache.org>
AuthorDate: Mon May 15 07:33:04 2023 +0200
PHOENIX-6910 Scans created during query compilation and execution against salted tables need to be more resilient
Co-authored-by: Karthik Palanisamy <kp...@cloudera.com>
Co-authored-by: Jacob Isaac <ja...@gmail.com>
---
.../phoenix/end2end/ParallelStatsDisabledIT.java | 6 -
.../end2end/salted/SaltedTableMergeBucketsIT.java | 487 +++++++++++++++++++++
.../org/apache/phoenix/compile/ScanRanges.java | 65 ++-
.../org/apache/phoenix/compile/WhereOptimizer.java | 1 +
.../phoenix/iterate/BaseResultIterators.java | 106 ++---
.../iterate/DefaultParallelScanGrouper.java | 64 +--
.../iterate/MapReduceParallelScanGrouper.java | 6 +-
.../phoenix/iterate/ParallelScanGrouper.java | 20 +-
.../phoenix/iterate/ParallelScansCollector.java | 57 +++
.../java/org/apache/phoenix/util/SchemaUtil.java | 8 +-
.../java/org/apache/phoenix/query/BaseTest.java | 7 +
.../java/org/apache/phoenix/util/TestUtil.java | 3 +-
12 files changed, 722 insertions(+), 108 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
index 782faf8389..3cd9dc7202 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
@@ -67,12 +67,6 @@ public abstract class ParallelStatsDisabledIT extends BaseTest {
BaseTest.freeResourcesIfBeyondThreshold();
}
- public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
- PreparedStatement statement = conn.prepareStatement(queryBuilder.build());
- ResultSet rs = statement.executeQuery();
- return rs;
- }
-
public static ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
ResultSet rs = null;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableMergeBucketsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableMergeBucketsIT.java
new file mode 100644
index 0000000000..01fc614836
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableMergeBucketsIT.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.salted;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.assertResultSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
+import org.apache.phoenix.end2end.ParallelStatsEnabledTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryBuilder;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(ParallelStatsEnabledTest.class)
+public class SaltedTableMergeBucketsIT extends ParallelStatsEnabledIT {
+
+ @Test
+ public void testWithVariousPKTypes() throws Exception {
+
+ int[] variousSaltBuckets = new int[] { 11, 23, 31 };
+ SortOrder[][] sortOrdersCase1 =
+ new SortOrder[][] { { SortOrder.ASC, SortOrder.ASC, SortOrder.ASC },
+ { SortOrder.ASC, SortOrder.ASC, SortOrder.DESC },
+ { SortOrder.ASC, SortOrder.DESC, SortOrder.ASC },
+ { SortOrder.ASC, SortOrder.DESC, SortOrder.DESC },
+ { SortOrder.DESC, SortOrder.ASC, SortOrder.ASC },
+ { SortOrder.DESC, SortOrder.ASC, SortOrder.DESC },
+ { SortOrder.DESC, SortOrder.DESC, SortOrder.ASC },
+ { SortOrder.DESC, SortOrder.DESC, SortOrder.DESC } };
+
+ SortOrder[][] sortOrdersCase2 =
+ new SortOrder[][] { { SortOrder.ASC, SortOrder.ASC, SortOrder.ASC, SortOrder.ASC },
+ { SortOrder.ASC, SortOrder.ASC, SortOrder.ASC, SortOrder.DESC },
+ { SortOrder.ASC, SortOrder.ASC, SortOrder.DESC, SortOrder.ASC },
+ { SortOrder.ASC, SortOrder.ASC, SortOrder.DESC, SortOrder.DESC },
+ { SortOrder.ASC, SortOrder.DESC, SortOrder.DESC, SortOrder.ASC },
+ { SortOrder.ASC, SortOrder.DESC, SortOrder.DESC, SortOrder.DESC },
+ { SortOrder.ASC, SortOrder.DESC, SortOrder.ASC, SortOrder.ASC },
+ { SortOrder.ASC, SortOrder.DESC, SortOrder.ASC, SortOrder.DESC },
+ { SortOrder.DESC, SortOrder.ASC, SortOrder.ASC, SortOrder.ASC },
+ { SortOrder.DESC, SortOrder.ASC, SortOrder.ASC, SortOrder.DESC },
+ { SortOrder.DESC, SortOrder.ASC, SortOrder.DESC, SortOrder.ASC },
+ { SortOrder.DESC, SortOrder.ASC, SortOrder.DESC, SortOrder.DESC },
+ { SortOrder.DESC, SortOrder.DESC, SortOrder.ASC, SortOrder.ASC },
+ { SortOrder.DESC, SortOrder.DESC, SortOrder.ASC, SortOrder.DESC },
+ { SortOrder.DESC, SortOrder.DESC, SortOrder.DESC, SortOrder.ASC },
+ { SortOrder.DESC, SortOrder.DESC, SortOrder.DESC, SortOrder.DESC } };
+
+ for (int index = 0; index < sortOrdersCase1.length; index++) {
+ // Test Case 1: PK1 = Bigint, PK2 = Decimal, PK3 = Bigint
+ String baseTableNameCase1 = generateUniqueName();
+ PDataType[] testIntDecIntPKTypes =
+ new PDataType[] { PLong.INSTANCE, PDecimal.INSTANCE, PLong.INSTANCE };
+ long nowTime1 =
+ createTableCase1(baseTableNameCase1, variousSaltBuckets[0],
+ testIntDecIntPKTypes[0], sortOrdersCase1[index][0], testIntDecIntPKTypes[1],
+ sortOrdersCase1[index][1], testIntDecIntPKTypes[2],
+ sortOrdersCase1[index][2]);
+ testIntDecIntPK(baseTableNameCase1, nowTime1, sortOrdersCase1[index]);
+ mergeRegions(baseTableNameCase1);
+ testIntDecIntPK(baseTableNameCase1, nowTime1, sortOrdersCase1[index]);
+
+ // Test Case 2: PK1 = Integer, PK2 = Integer, PK3 = Integer, PK4 = Integer
+ String baseTableNameCase2 = generateUniqueName();
+ PDataType[] testIntIntIntIntPKTypes =
+ new PDataType[] { PInteger.INSTANCE, PInteger.INSTANCE, PInteger.INSTANCE,
+ PInteger.INSTANCE };
+ createTableCase2(baseTableNameCase2, variousSaltBuckets[0], testIntIntIntIntPKTypes[0],
+ sortOrdersCase2[index][0], testIntIntIntIntPKTypes[1], sortOrdersCase2[index][1],
+ testIntIntIntIntPKTypes[2], sortOrdersCase2[index][2], testIntIntIntIntPKTypes[3],
+ sortOrdersCase2[index][3]);
+ doTestGroupByOrderMatchPkColumnOrderBug4690(baseTableNameCase2);
+ mergeRegions(baseTableNameCase2);
+ doTestGroupByOrderMatchPkColumnOrderBug4690(baseTableNameCase2);
+ }
+ }
+
+ private void testIntDecIntPK(String tableName, long nowTime, SortOrder[] sortOrder)
+ throws SQLException {
+ String testName = "testIntDecIntPK";
+ String testSQL1 =
+ String.format(
+ "SELECT ROW_ID FROM %s WHERE (ID1, ID2, ID3) IN ((%d, 21, 1),(%d, 2, 31))",
+ tableName, nowTime, nowTime);
+ String testSQL2 =
+ String.format("SELECT ROW_ID FROM %s WHERE (ID2, ID3) IN ((21.0, 1),(2.0, 3))",
+ tableName);
+ String testSQL3 =
+ String.format("SELECT ROW_ID FROM %s WHERE (ID1, ID2) IN ((%d, 21.0),(%d, 2.0))",
+ tableName, nowTime + 1, nowTime + 1);
+ String testSQL4 =
+ String.format(
+ "SELECT ROW_ID FROM %s WHERE (ID3, ID2, ID1) IN ((3, 21.0, %d),(3, 2.0, %d))",
+ tableName, nowTime + 1, nowTime + 1);
+ String testSQL5 =
+ String.format(
+ "SELECT ROW_ID FROM %s WHERE (ID1, ID2, ID3) IN ((%d, 21.0, 3),(%d, 2.0, 3))",
+ tableName, nowTime + 1, nowTime + 1);
+ String testSQL6 =
+ String.format("SELECT ROW_ID FROM %s WHERE ID1 = %d AND ID2 = 2.0", tableName,
+ nowTime);
+ String testSQL7 =
+ String.format("SELECT ROW_ID FROM %s WHERE ID1 >= %d AND ID1 < %d", tableName,
+ nowTime, nowTime + 3);
+ String testSQL8 =
+ String.format(
+ "SELECT ROW_ID FROM %s WHERE (ID1 = %d OR ID1 = %d OR ID1 = %d) AND (ID3 = %d)",
+ tableName, nowTime, nowTime + 3, nowTime + 2, 5);
+
+ Set<String> expecteds1 = Collections.<String> emptySet();
+ Set<String> expecteds2 = Sets.newHashSet(new String[] { "row0", "row1" });
+ Set<String> expecteds3 = Sets.newHashSet(new String[] { "row1", "row2" });
+ Set<String> expecteds4 = Sets.newHashSet(new String[] { "row1" });
+ Set<String> expecteds5 = Sets.newHashSet(new String[] { "row1" });
+ Set<String> expecteds6 = Sets.newHashSet(new String[] { "row0" });
+ Set<String> expecteds7 = Sets.newHashSet(new String[] { "row0", "row1", "row2", "row3" });
+ Set<String> expecteds8 = Sets.newHashSet(new String[] { "row3" });
+
+ assertExpectedWithWhere(testName, testSQL1, expecteds1, expecteds1.size());
+ assertExpectedWithWhere(testName, testSQL2, expecteds2, expecteds2.size());
+ assertExpectedWithWhere(testName, testSQL3, expecteds3, expecteds3.size());
+ assertExpectedWithWhere(testName, testSQL4, expecteds4, expecteds4.size());
+ assertExpectedWithWhere(testName, testSQL5, expecteds5, expecteds5.size());
+ assertExpectedWithWhere(testName, testSQL6, expecteds6, expecteds6.size());
+ assertExpectedWithWhere(testName, testSQL7, expecteds7, expecteds7.size());
+ assertExpectedWithWhere(testName, testSQL8, expecteds8, expecteds8.size());
+ }
+
+ private void assertExpectedWithWhere(String testType, String testSQL, Set<String> expectedSet,
+ int expectedCount) throws SQLException {
+ String context = "sql: " + testSQL + ", type: " + testType;
+
+ try (Connection tenantConnection = DriverManager.getConnection(getUrl())) {
+ // perform the query
+ ResultSet rs = tenantConnection.createStatement().executeQuery(testSQL);
+ for (int i = 0; i < expectedCount; i++) {
+ assertTrue(
+ "did not include result '" + expectedSet.toString() + "' (" + context + ")",
+ rs.next());
+ String actual = rs.getString(1);
+ assertTrue(context, expectedSet.contains(actual));
+ }
+ assertFalse(context, rs.next());
+ }
+ }
+
+ private void doTestGroupByOrderMatchPkColumnOrderBug4690(String tableName) throws Exception {
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ QueryBuilder queryBuilder =
+ new QueryBuilder().setSelectExpression("PK2,PK1,COUNT(V)")
+ .setSelectExpressionColumns(Lists.newArrayList("PK1", "PK2", "V"))
+ .setFullTableName(tableName).setGroupByClause("PK2, PK1")
+ .setOrderByClause("PK2, PK1");
+ ResultSet rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 2, 3, 2L }, { 3, 2, 1L }, { 7, 2, 2L },
+ { 8, 1, 2L }, { 9, 1, 4L } });
+
+ queryBuilder.setSelectExpression("PK1, PK2, COUNT(V)");
+ queryBuilder.setOrderByClause("PK1, PK2");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 1, 8, 2L }, { 1, 9, 4L }, { 2, 3, 1L },
+ { 2, 7, 2L }, { 3, 2, 2L } });
+
+ queryBuilder.setSelectExpression("PK2,PK1,COUNT(V)");
+ queryBuilder.setOrderByClause("PK2 DESC,PK1 DESC");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 9, 1, 4L }, { 8, 1, 2L }, { 7, 2, 2L },
+ { 3, 2, 1L }, { 2, 3, 2L } });
+
+ queryBuilder.setSelectExpression("PK1,PK2,COUNT(V)");
+ queryBuilder.setOrderByClause("PK1 DESC,PK2 DESC");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 3, 2, 2L }, { 2, 7, 2L }, { 2, 3, 1L },
+ { 1, 9, 4L }, { 1, 8, 2L } });
+
+ queryBuilder.setSelectExpression("PK3,PK2,COUNT(V)");
+ queryBuilder.setSelectExpressionColumns(Lists.newArrayList("PK1", "PK2", "PK3", "V"));
+ queryBuilder.setFullTableName(tableName);
+ queryBuilder.setGroupByClause("PK3,PK2");
+ queryBuilder.setOrderByClause("PK3,PK2");
+ queryBuilder.setWhereClause("PK1=1");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 5, 9, 1L }, { 6, 9, 2L }, { 7, 9, 1L },
+ { 10, 8, 1L }, { 11, 8, 1L } });
+
+ queryBuilder.setSelectExpression("PK2,PK3,COUNT(V)");
+ queryBuilder.setOrderByClause("PK2,PK3");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 8, 10, 1L }, { 8, 11, 1L }, { 9, 5, 1L },
+ { 9, 6, 2L }, { 9, 7, 1L } });
+
+ queryBuilder.setSelectExpression("PK3,PK2,COUNT(V)");
+ queryBuilder.setOrderByClause("PK3 DESC,PK2 DESC");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 11, 8, 1L }, { 10, 8, 1L }, { 7, 9, 1L },
+ { 6, 9, 2L }, { 5, 9, 1L } });
+
+ queryBuilder.setSelectExpression("PK2,PK3,COUNT(V)");
+ queryBuilder.setOrderByClause("PK2 DESC,PK3 DESC");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 9, 7, 1L }, { 9, 6, 2L }, { 9, 5, 1L },
+ { 8, 11, 1L }, { 8, 10, 1L } });
+
+ queryBuilder.setSelectExpression("PK4,PK3,PK1,COUNT(V)");
+ queryBuilder.setSelectExpressionColumns(
+ Lists.newArrayList("PK1", "PK2", "PK3", "PK4", "V"));
+ queryBuilder.setFullTableName(tableName);
+ queryBuilder.setWhereClause("PK2=9 ");
+ queryBuilder.setGroupByClause("PK4,PK3,PK1");
+ queryBuilder.setOrderByClause("PK4,PK3,PK1");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 8, 7, 1, 1L }, { 12, 6, 1, 1L },
+ { 13, 6, 1, 1L }, { 22, 5, 1, 1L } });
+
+ queryBuilder.setSelectExpression("PK1,PK3,PK4,COUNT(V)");
+ queryBuilder.setOrderByClause("PK1,PK3,PK4");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 1, 5, 22, 1L }, { 1, 6, 12, 1L },
+ { 1, 6, 13, 1L }, { 1, 7, 8, 1L } });
+
+ queryBuilder.setSelectExpression("PK4,PK3,PK1,COUNT(V)");
+ queryBuilder.setOrderByClause("PK4 DESC,PK3 DESC,PK1 DESC");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 22, 5, 1, 1L }, { 13, 6, 1, 1L },
+ { 12, 6, 1, 1L }, { 8, 7, 1, 1L } });
+
+ queryBuilder.setSelectExpression("PK1,PK3,PK4,COUNT(V)");
+ queryBuilder.setOrderByClause("PK1 DESC,PK3 DESC,PK4 DESC");
+ rs = executeQuery(conn, queryBuilder);
+ assertResultSet(rs, new Object[][] { { 1, 7, 8, 1L }, { 1, 6, 13, 1L },
+ { 1, 6, 12, 1L }, { 1, 5, 22, 1L } });
+ }
+ }
+
+ public void mergeRegions(String testTableName) throws Exception {
+ Admin admin =
+ driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ List<RegionInfo> regions = admin.getRegions(TableName.valueOf(testTableName));
+
+ for (int i = 0; i < regions.size() - 1; i += 2) {
+ byte[][] regionsToMerge = new byte[2][];
+ regionsToMerge[0] = regions.get(i).getEncodedNameAsBytes();
+ regionsToMerge[1] = regions.get(i + 1).getEncodedNameAsBytes();
+ admin.mergeRegionsAsync(regionsToMerge, false).get();
+ }
+ }
+
+ private long createTableCase1(String baseTable, int saltBuckets, PDataType pkType1,
+ SortOrder pk1Order, PDataType pkType2, SortOrder pk2Order, PDataType pkType3,
+ SortOrder pk3Order) throws SQLException {
+
+ String pkType1Str = getType(pkType1);
+ String pkType2Str = getType(pkType2);
+ String pkType3Str = getType(pkType3);
+
+ try (Connection tenantConnection = DriverManager.getConnection(getUrl())) {
+ try (Statement cstmt = tenantConnection.createStatement()) {
+ String TABLE_TEMPLATE =
+ "CREATE TABLE IF NOT EXISTS %s(ID1 %s not null,ID2 %s not null, "
+ + "ID3 %s not null, ROW_ID VARCHAR, V VARCHAR CONSTRAINT pk "
+ + " PRIMARY KEY (ID1 %s, ID2 %s, ID3 %s)) SALT_BUCKETS=%d ";
+ cstmt.execute(String.format(TABLE_TEMPLATE, baseTable, pkType1Str, pkType2Str,
+ pkType3Str, pk1Order.name(), pk2Order.name(), pk3Order.name(), saltBuckets));
+ }
+ }
+
+ long nowTime = System.currentTimeMillis();
+ List<String> UPSERT_SQLS = new ArrayList<>();
+ UPSERT_SQLS.addAll(Arrays.asList(new String[] {
+ String.format(
+ "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+ baseTable, nowTime, 2.0, 3, "row0", "v1"),
+ String.format(
+ "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+ baseTable, nowTime + 1, 2.0, 3, "row1", "v2"),
+ String.format(
+ "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+ baseTable, nowTime + 1, 2.0, 5, "row2", "v3"),
+ String.format(
+ "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+ baseTable, nowTime + 2, 4.0, 5, "row3", "v4"),
+ String.format(
+ "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+ baseTable, nowTime + 3, 6.0, 7, "row4", "v5") }));
+
+ for (int i = 5; i < 100; i++) {
+ UPSERT_SQLS.add(String.format(
+ "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+ baseTable, nowTime + i, 10.0, 13, "row" + i, "v" + i));
+ }
+
+ try (Connection tenantConnection = DriverManager.getConnection(getUrl())) {
+ tenantConnection.setAutoCommit(true);
+ try (Statement ustmt = tenantConnection.createStatement()) {
+ for (String upsertSql : UPSERT_SQLS) {
+ ustmt.execute(upsertSql);
+ }
+ }
+ }
+
+ return nowTime;
+ }
+
+ private void createTableCase2(String baseTable, int saltBuckets, PDataType pkType1,
+ SortOrder pk1Order, PDataType pkType2, SortOrder pk2Order, PDataType pkType3,
+ SortOrder pk3Order, PDataType pkType4, SortOrder pk4Order) throws SQLException {
+
+ String pkType1Str = getType(pkType1);
+ String pkType2Str = getType(pkType2);
+ String pkType3Str = getType(pkType3);
+ String pkType4Str = getType(pkType4);
+
+ try (Connection globalConnection = DriverManager.getConnection(getUrl())) {
+ try (Statement cstmt = globalConnection.createStatement()) {
+ String TABLE_TEMPLATE =
+ "CREATE TABLE IF NOT EXISTS %s(PK1 %s not null,PK2 %s not null, "
+ + " PK3 %s not null,PK4 %s not null, V INTEGER CONSTRAINT "
+ + " pk PRIMARY KEY (PK1 %s, PK2 %s, PK3 %s, PK4 %s)) SALT_BUCKETS=%d ";
+ cstmt.execute(String.format(TABLE_TEMPLATE, baseTable, pkType1Str, pkType2Str,
+ pkType3Str, pkType4Str, pk1Order.name(), pk2Order.name(), pk3Order.name(),
+ pk4Order.name(), saltBuckets));
+ }
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,8,10,20,30)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,8,11,21,31)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,9,5 ,22,32)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,9,6 ,12,33)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,9,6 ,13,34)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,9,7 ,8,35)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (2,3,15,25,35)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (2,7,16,26,36)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (2,7,17,27,37)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (3,2,18,28,38)");
+ conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (3,2,19,29,39)");
+ conn.commit();
+ }
+ }
+
+ private String getType(PDataType pkType) {
+ String pkTypeStr = "VARCHAR(25)";
+ switch (pkType.getSqlType()) {
+ case Types.VARCHAR:
+ pkTypeStr = "VARCHAR(25)";
+ break;
+ case Types.CHAR:
+ pkTypeStr = "CHAR(15)";
+ break;
+ case Types.DECIMAL:
+ pkTypeStr = "DECIMAL(8,2)";
+ break;
+ case Types.INTEGER:
+ pkTypeStr = "INTEGER";
+ break;
+ case Types.BIGINT:
+ pkTypeStr = "BIGINT";
+ break;
+ case Types.DATE:
+ pkTypeStr = "DATE";
+ break;
+ case Types.TIMESTAMP:
+ pkTypeStr = "TIMESTAMP";
+ break;
+ default:
+ pkTypeStr = "VARCHAR(25)";
+ }
+ return pkTypeStr;
+ }
+
+ @Test
+ public void testMergesWithWideGuidepostsAndWithStatsForParallelization() throws Exception {
+ testMerges(true, true);
+ }
+
+ @Test
+ public void testMergesWithWideGuidepostsAndWithoutStatsForParallelization() throws Exception {
+ testMerges(true, false);
+ }
+
+ @Test
+ public void testMergesWithoutWideGuidepostsAndWithStatsForParallelization() throws Exception {
+ testMerges(false, true);
+ }
+
+ @Test
+ public void testMergesWithoutWideGuidepostsAndWithoutStatsForParallelization()
+ throws Exception {
+ testMerges(false, false);
+ }
+
+ public void testMerges(boolean withWideGuideposts, boolean withStatsForParallelization)
+ throws Exception {
+ String tableName = generateUniqueName();
+
+ List<String> range =
+ IntStream.range(0, 100).boxed().map(s -> String.format("201912%03d", s))
+ .collect(Collectors.toList());
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.FALSE.toString());
+ if (!withStatsForParallelization) {
+ props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.FALSE.toString());
+ }
+ try (Connection connection = DriverManager.getConnection(getUrl(), props);
+ Statement statement = connection.createStatement();) {
+ statement.execute("CREATE TABLE " + tableName
+ + " (c1 VARCHAR NOT NULL, c2 VARCHAR NOT NULL, c3 VARCHAR NOT NULL, v1 VARCHAR "
+ + " CONSTRAINT pk PRIMARY KEY(c1,c2,c3)) SALT_BUCKETS=11");
+ for (String c1 : range) {
+ statement.execute(" upsert into " + tableName + " values('" + c1
+ + "','HORTONWORKS_WEEKLY_TEST','v3','" + c1 + "')");
+ }
+ connection.commit();
+
+ if (withWideGuideposts) {
+ // This is way bigger than the regions, guaranteeing no guideposts inside the
+ // original regions
+ statement.execute(
+ "UPDATE STATISTICS " + tableName + " SET GUIDE_POSTS_WIDTH = 1000000");
+ } else {
+ //The default 20 bytes for these tests
+ statement.execute(
+ "UPDATE STATISTICS " + tableName);
+ }
+
+ mergeRegions(tableName);
+
+ for (String c1 : range) {
+ ResultSet rs =
+ statement.executeQuery("select c1, c2, c3, v1 from " + tableName
+ + " where c1='" + c1 + "' and c2 like '%HORTONWORKS_WEEKLY_TEST%'");
+ assertTrue(rs.next());
+ assertEquals(c1, rs.getString("c1"));
+ assertEquals(c1, rs.getString("v1"));
+ assertFalse(rs.next());
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 1502315f50..94c3f87840 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -245,7 +245,70 @@ public class ScanRanges {
System.arraycopy(key, keyOffset, temp, 0, key.length - keyOffset);
return temp;
}
-
+
+ // This variant adds synthetic scan boundaries at potentially missing salt bucket boundaries
+ // and won't return null Scans
+ public List<Scan> intersectScan(Scan scan, final byte[] originalStartKey,
+ final byte[] originalStopKey, final int keyOffset, byte[] splitPostfix, Integer buckets,
+ boolean crossesRegionBoundary) {
+ // FIXME Both the salted status and the pre-computed bucket list should be available in
+ // this object, but in some cases they get overwritten, so we cannot use that.
+ List<Scan> newScans = new ArrayList<Scan>();
+ if (buckets != null && buckets > 0) {
+ byte[] wrkStartKey = originalStartKey;
+ do {
+ boolean lastBucket = false;
+ byte[] nextBucketStart = null;
+ byte[] nextBucketByte = null;
+ if (wrkStartKey.length > 0 && Byte.toUnsignedInt(wrkStartKey[0]) >= buckets - 1) {
+ lastBucket = true;
+ } else {
+ // This includes the zero bytes from the minimum PK
+ nextBucketStart = bucketEnd(wrkStartKey, splitPostfix);
+ // These is the start of the next bucket in byte[], without the PK suffix
+ nextBucketByte = new byte[] { nextBucketStart[0] };
+ }
+ if (lastBucket || Bytes.compareTo(originalStopKey, nextBucketStart) <= 0) {
+ // either we don't need to add synthetic guideposts, or we already have, and
+ // are at the last bucket of the original scan
+ addIfNotNull(newScans, intersectScan(scan, wrkStartKey, originalStopKey,
+ keyOffset, crossesRegionBoundary));
+ break;
+ }
+ // This is where we add the synthetic guidepost.
+ // We skip [nextBucketByte, nextBucketStart), but it's guaranteed that there are no
+ // rows there.
+ addIfNotNull(newScans,
+ intersectScan(scan, wrkStartKey, nextBucketByte, keyOffset, false));
+ wrkStartKey = nextBucketStart;
+ } while (true);
+ } else {
+ // Definitely Not crossing buckets
+ addIfNotNull(newScans, intersectScan(scan, originalStartKey, originalStopKey, keyOffset,
+ crossesRegionBoundary));
+ }
+ return newScans;
+ }
+
+ private void addIfNotNull(List<Scan> scans, Scan newScan) {
+ if (newScan != null) {
+ scans.add(newScan);
+ }
+ }
+
+ // The split (presplit for salted tables) code extends the split point to the minimum PK length.
+ // Adding the same postfix here avoids creating and extra [n,n\x00\x00\x00..\x00) scan for each
+ // bucket
+ private byte[] bucketEnd(byte[] key, byte[] splitPostfix) {
+ byte startByte = key.length > 0 ? key[0] : 0;
+ int nextBucket = Byte.toUnsignedInt(startByte) + 1;
+ byte[] bucketEnd = new byte[splitPostfix.length + 1];
+ bucketEnd[0] = (byte) nextBucket;
+ System.arraycopy(splitPostfix, 0, bucketEnd, 1, splitPostfix.length);
+ return bucketEnd;
+ }
+
+ //TODO split this for normal, salted and local index variants
public Scan intersectScan(Scan scan, final byte[] originalStartKey, final byte[] originalStopKey, final int keyOffset, boolean crossesRegionBoundary) {
byte[] startKey = originalStartKey;
byte[] stopKey = originalStopKey;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 94b1972c9d..21011c24b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -155,6 +155,7 @@ public class WhereOptimizer {
keySlots = whereClause.accept(visitor);
if (keySlots == null && (tenantId == null || !table.isMultiTenant()) && table.getViewIndexId() == null && !minOffset.isPresent()) {
+ // FIXME this overwrites salting info in the scanRange
context.setScanRanges(ScanRanges.EVERYTHING);
return whereClause;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 2c6885bbf5..cecab8626f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -39,6 +39,7 @@ import java.io.DataInputStream;
import java.io.EOFException;
import java.sql.SQLException;
import java.util.*;
+import java.util.Arrays;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -671,24 +672,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
Math.min(estimate.lastUpdated,
gps.getGuidePostTimestamps()[guideIndex]);
}
-
- private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan,
- byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation) {
- boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary);
- if (scan != null) {
- if (regionLocation.getServerName() != null) {
- scan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER, regionLocation.getServerName().getVersionedBytes());
- }
- if (useStatsForParallelization || crossedRegionBoundary) {
- scans.add(scan);
- }
- }
- if (startNewScan && !scans.isEmpty()) {
- parallelScans.add(scans);
- scans = Lists.newArrayListWithExpectedSize(1);
- }
- return scans;
- }
private List<List<Scan>> getParallelScans() throws SQLException {
// If the scan boundaries are not matching with scan in context that means we need to get
@@ -718,18 +701,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
if (scan.getStopRow().length > 0) {
stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), scan.getStopRow()));
}
- List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
- List<Scan> scans = Lists.newArrayListWithExpectedSize(2);
+ ParallelScansCollector parallelScans = new ParallelScansCollector(scanGrouper);
while (regionIndex <= stopIndex) {
HRegionLocation regionLocation = regionLocations.get(regionIndex);
RegionInfo regionInfo = regionLocation.getRegion();
Scan newScan = ScanUtil.newScan(scan);
- byte[] endKey;
- if (regionIndex == stopIndex) {
- endKey = scan.getStopRow();
- } else {
- endKey = regionBoundaries.get(regionIndex);
- }
if (ScanUtil.isLocalIndex(scan)) {
ScanUtil.setLocalIndexAttributes(newScan, 0, regionInfo.getStartKey(),
regionInfo.getEndKey(), newScan.getAttribute(SCAN_START_ROW_SUFFIX),
@@ -744,13 +720,14 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
newScan.withStopRow(regionInfo.getEndKey());
}
}
- scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
+ if (regionLocation.getServerName() != null) {
+ newScan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER,
+ regionLocation.getServerName().getVersionedBytes());
+ }
+ parallelScans.addNewScan(plan, newScan, true);
regionIndex++;
}
- if (!scans.isEmpty()) { // Add any remaining scans
- parallelScans.add(scans);
- }
- return parallelScans;
+ return parallelScans.getParallelScans();
}
private static class GuidePostEstimate {
@@ -924,7 +901,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// return true if we've clipped the rowKey
return maxOffset != offset;
}
-
+
/**
* Compute the list of parallel scans to run for a given query. The inner scans
* may be concatenated together directly, while the other ones may need to be
@@ -962,6 +939,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
Long.MAX_VALUE, false);
return parallelScans;
}
+ byte[] sampleProcessedSaltByte =
+ SchemaUtil.processSplit(new byte[] { 0 }, table.getPKColumns());
+ byte[] splitPostfix =
+ Arrays.copyOfRange(sampleProcessedSaltByte, 1, sampleProcessedSaltByte.length);
List<HRegionLocation> regionLocations = getRegionBoundaries(scanGrouper);
List<byte[]> regionBoundaries = toBoundaries(regionLocations);
boolean isSalted = table.getBucketNum() != null;
@@ -1014,15 +995,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
stopKey = regionLocations.get(stopIndex).getRegion().getEndKey();
}
}
- List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+ ParallelScansCollector parallelScanCollector = new ParallelScansCollector(scanGrouper);
ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
int gpsSize = gps.getGuidePostsCount();
- int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
int keyOffset = 0;
ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
- List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
ImmutableBytesWritable guidePosts = gps.getGuidePosts();
ByteArrayInputStream stream = null;
DataInput input = null;
@@ -1118,13 +1097,28 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
byte[] initialKeyBytes = currentKeyBytes;
int gpsComparedToEndKey = -1;
boolean everNotDelayed = false;
- while (intersectWithGuidePosts && (endKey.length == 0 || (gpsComparedToEndKey=currentGuidePost.compareTo(endKey)) <= 0)) {
- Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
- false);
- if (newScan != null) {
- ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
- regionInfo.getStartKey(), regionInfo.getEndKey(),
- newScan.getStartRow(), newScan.getStopRow());
+ while (intersectWithGuidePosts && (endKey.length == 0
+ || (gpsComparedToEndKey = currentGuidePost.compareTo(endKey)) <= 0)) {
+ List<Scan> newScans =
+ scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes,
+ keyOffset, splitPostfix, getTable().getBucketNum(),
+ gpsComparedToEndKey == 0);
+ if (useStatsForParallelization) {
+ for (int newScanIdx = 0; newScanIdx < newScans.size(); newScanIdx++) {
+ Scan newScan = newScans.get(newScanIdx);
+ ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
+ regionInfo.getStartKey(), regionInfo.getEndKey(),
+ newScan.getStartRow(), newScan.getStopRow());
+ if (regionLocation.getServerName() != null) {
+ newScan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER,
+ regionLocation.getServerName().getVersionedBytes());
+ }
+ boolean lastOfNew = newScanIdx == newScans.size() - 1;
+ parallelScanCollector.addNewScan(plan, newScan,
+ gpsComparedToEndKey == 0 && lastOfNew);
+ }
+ }
+ if (newScans.size() > 0) {
// If we've delaying adding estimates, add the previous
// gp estimates now that we know they are in range.
if (delayAddingEst) {
@@ -1139,7 +1133,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
delayAddingEst = false;
}
everNotDelayed |= !delayAddingEst;
- scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation);
currentKeyBytes = currentGuidePostBytes;
try {
currentGuidePost = PrefixByteCodec.decode(decoder, input);
@@ -1158,10 +1151,21 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
*/
currentKeyBytes = initialKeyBytes;
}
- Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true);
- if (newScan != null) {
+ List<Scan> newScans =
+ scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset,
+ splitPostfix, getTable().getBucketNum(), true);
+ for (int newScanIdx = 0; newScanIdx < newScans.size(); newScanIdx++) {
+ Scan newScan = newScans.get(newScanIdx);
ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
+ if (regionLocation.getServerName() != null) {
+ newScan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER,
+ regionLocation.getServerName().getVersionedBytes());
+ }
+ boolean lastOfNew = newScanIdx == newScans.size() - 1;
+ parallelScanCollector.addNewScan(plan, newScan, lastOfNew);
+ }
+ if (newScans.size() > 0) {
// Boundary case of no GP in region after delaying adding of estimates
if (!gpsInThisRegion && delayAddingEst) {
updateEstimates(gps, guideIndex-1, estimates);
@@ -1171,7 +1175,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
} else if (!gpsInThisRegion) {
delayAddingEst = false;
}
- scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
currentKeyBytes = endKey;
// We have a guide post in the region if the above loop was entered
// or if the current key is less than the region end key (since the loop
@@ -1183,7 +1186,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
( regionIndex == startRegionIndex && gpsForFirstRegion ) || // GP in first region (before start key)
( gpsAfterStopKey = ( regionIndex == stopIndex && intersectWithGuidePosts && // GP in last region (after stop key)
( endRegionKey.length == 0 || // then check if gp is in the region
- currentGuidePost.compareTo(endRegionKey) < 0) ) );
+ currentGuidePost.compareTo(endRegionKey) < 0)));
if (gpsAfterStopKey) {
// If gp after stop key, but still in last region, track min ts as fallback
fallbackTs =
@@ -1192,16 +1195,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
regionIndex++;
}
- if (!scans.isEmpty()) { // Add any remaining scans
- parallelScans.add(scans);
- }
- generateEstimates(scanRanges, table, gps, emptyGuidePost, parallelScans, estimates,
+ generateEstimates(scanRanges, table, gps, emptyGuidePost, parallelScanCollector.getParallelScans(), estimates,
fallbackTs, gpsAvailableForAllRegions);
} finally {
if (stream != null) Closeables.closeQuietly(stream);
}
- sampleScans(parallelScans,this.plan.getStatement().getTableSamplingRate());
- return parallelScans;
+ sampleScans(parallelScanCollector.getParallelScans(),this.plan.getStatement().getTableSamplingRate());
+ return parallelScanCollector.getParallelScans();
}
private void generateEstimates(ScanRanges scanRanges, PTable table, GuidePostsInfo gps,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
index 646c0bb972..2064619b1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.iterate;
import java.sql.SQLException;
import java.util.List;
-import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.QueryPlan;
@@ -28,7 +27,6 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.ScanUtil;
/**
@@ -37,38 +35,42 @@ import org.apache.phoenix.util.ScanUtil;
*/
public class DefaultParallelScanGrouper implements ParallelScanGrouper {
- private static DefaultParallelScanGrouper INSTANCE = new DefaultParallelScanGrouper();
+ private static DefaultParallelScanGrouper INSTANCE = new DefaultParallelScanGrouper();
+ public DefaultParallelScanGrouper() {
+ }
- public DefaultParallelScanGrouper() {
- }
-
- public static DefaultParallelScanGrouper getInstance() {
- return INSTANCE;
- }
-
+ public static DefaultParallelScanGrouper getInstance() {
+ return INSTANCE;
+ }
- @Override
- public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary) {
- PTable table = plan.getTableRef().getTable();
- boolean startNewScanGroup = false;
- if (!plan.isRowKeyOrdered()) {
- startNewScanGroup = true;
- } else if (crossedRegionBoundary) {
- if (table.getIndexType() == IndexType.LOCAL) {
- startNewScanGroup = true;
- } else if (table.getBucketNum() != null) {
- startNewScanGroup = scans.isEmpty() ||
- ScanUtil.crossesPrefixBoundary(startKey,
- ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES),
- SaltingUtil.NUM_SALTING_BYTES);
- }
+ /**
+ * Returns true if the scan with the startKey is to be the first of a new batch
+ */
+ @Override
+ public boolean shouldStartNewScan(QueryPlan plan, Scan lastScan, byte[] startKey,
+ boolean crossesRegionBoundary) {
+ PTable table = plan.getTableRef().getTable();
+ if (lastScan == null) {
+ return false;
+ } else if (!plan.isRowKeyOrdered()) {
+ return true;
+ } else if (crossesRegionBoundary && table.getIndexType() == IndexType.LOCAL) {
+ return true;
+ } else if (table.getBucketNum() != null ) {
+ return crossesRegionBoundary
+ || ScanUtil.crossesPrefixBoundary(startKey,
+ ScanUtil.getPrefix(lastScan.getStartRow(),
+ SaltingUtil.NUM_SALTING_BYTES),
+ SaltingUtil.NUM_SALTING_BYTES);
+ } else {
+ return false;
+ }
}
- return startNewScanGroup;
- }
- @Override
- public List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName) throws SQLException{
- return context.getConnection().getQueryServices().getAllTableRegions(tableName);
- }
+ @Override
+ public List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName)
+ throws SQLException {
+ return context.getConnection().getQueryServices().getAllTableRegions(tableName);
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
index 1af96c37c6..088432ae53 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -56,9 +56,9 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
MapReduceParallelScanGrouper() {}
@Override
- public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans,
- byte[] startKey, boolean crossedRegionBoundary) {
- return !plan.isRowKeyOrdered() || crossedRegionBoundary;
+ public boolean shouldStartNewScan(QueryPlan plan, Scan lastScan,
+ byte[] startKey, boolean crossesRegionBoundary) {
+ return (!plan.isRowKeyOrdered() || crossesRegionBoundary) && lastScan != null;
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
index 5e237a5662..e53bc0a3b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
@@ -30,16 +30,16 @@ import org.apache.phoenix.compile.StatementContext;
*/
public interface ParallelScanGrouper {
- /**
- * Determines whether to create a new group of parallel scans.
- *
- * @param scans current scan group
- * @param plan current query plan
- * @param startKey start key of scan
- * @param crossedRegionBoundary whether we crossed a region boundary
- * @return true if we should create a new group of scans
- */
- boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary);
+ /**
+ * Determines whether to create a new group of parallel scans.
+ * @param plan current query plan
+ * @param lastScan the last scan processed
+ * @param startKey of the new scan
+ * @param crossesRegionBoundary whether startKey in a different region than lastScan
+ * @return true if we should create a new group of scans, starting with the scan whose start
+ * key we passed as startKey
+ */
+ boolean shouldStartNewScan(QueryPlan plan, Scan lastScan, byte[] startKey, boolean crossesRegionBoundary);
List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName) throws SQLException;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java
new file mode 100644
index 0000000000..e1c99e2836
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+
+/**
+ * Stores some state to help build the parallel Scans structure
+ */
+public class ParallelScansCollector {
+
+ private final ParallelScanGrouper grouper;
+ private boolean lastScanCrossedRegionBoundary = false;
+ private final List<List<Scan>> parallelScans = new ArrayList<>();
+ private List<Scan> lastBatch = new ArrayList<>();
+ private Scan lastScan = null;
+
+ public ParallelScansCollector(ParallelScanGrouper grouper) {
+ this.grouper = grouper;
+ parallelScans.add(lastBatch);
+ }
+
+ public void addNewScan(QueryPlan plan, Scan newScan, boolean crossesRegionBoundary) {
+ if (grouper.shouldStartNewScan(plan, lastScan, newScan.getStartRow(),
+ lastScanCrossedRegionBoundary)) {
+ lastBatch = new ArrayList<>();
+ parallelScans.add(lastBatch);
+ }
+ lastBatch.add(newScan);
+
+ lastScanCrossedRegionBoundary = crossesRegionBoundary;
+ lastScan = newScan;
+ }
+
+ public List<List<Scan>> getParallelScans() {
+ return parallelScans;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 8272eab92e..fbca85b3b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -38,6 +38,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -597,7 +598,8 @@ public class SchemaUtil {
}
// Given the splits and the rowKeySchema, find out the keys that
- public static byte[][] processSplits(byte[][] splits, LinkedHashSet<PColumn> pkColumns, Integer saltBucketNum, boolean defaultRowKeyOrder) throws SQLException {
+ public static byte[][] processSplits(byte[][] splits, Collection<PColumn> pkColumns,
+ Integer saltBucketNum, boolean defaultRowKeyOrder) throws SQLException {
// FIXME: shouldn't this return if splits.length == 0?
if (splits == null) return null;
// We do not accept user specified splits if the table is salted and we specify defaultRowKeyOrder. In this case,
@@ -611,6 +613,8 @@ public class SchemaUtil {
}
byte[][] newSplits = new byte[splits.length][];
for (int i=0; i<splits.length; i++) {
+ // Salted tables don't need this processing, but the Split policy uses it for all
+ // tables, so it makes sense to apply to those to be consistent
newSplits[i] = processSplit(splits[i], pkColumns);
}
return newSplits;
@@ -618,7 +622,7 @@ public class SchemaUtil {
// Go through each slot in the schema and try match it with the split byte array. If the split
// does not confer to the schema, extends its length to match the schema.
- private static byte[] processSplit(byte[] split, LinkedHashSet<PColumn> pkColumns) {
+ public static byte[] processSplit(byte[] split, Collection<PColumn> pkColumns) {
int pos = 0, offset = 0, maxOffset = split.length;
Iterator<PColumn> iterator = pkColumns.iterator();
while (pos < pkColumns.size()) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index fd8f0903b0..4c8a1e2ade 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -160,6 +160,7 @@ import org.apache.phoenix.util.ConfigUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryBuilder;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
@@ -748,6 +749,12 @@ public abstract class BaseTest {
createTestTable(url, ddl, splits, ts);
}
+ public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+ PreparedStatement statement = conn.prepareStatement(queryBuilder.build());
+ ResultSet rs = statement.executeQuery();
+ return rs;
+ }
+
private static AtomicInteger NAME_SUFFIX = new AtomicInteger(0);
private static final int MAX_SUFFIX_VALUE = 1000000;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 60e0a108bb..fe0d8d1b2c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1274,8 +1274,7 @@ public class TestUtil {
if (realValue == null) {
assertNull("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "]", expectedValue);
} else {
- assertEquals("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "],realValue:[" +
- realValue + "],expectedValue:[" + expectedValue + "]",
+ assertEquals("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "]",
expectedValue,
realValue
);