You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/04/17 20:39:22 UTC
phoenix git commit: PHOENIX-1779 Parallelize fetching of next batch
of records for scans corresponding to queries with no order by
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 cdca9377e -> bb8d7664f
PHOENIX-1779 Parallelize fetching of next batch of records for scans corresponding to queries with no order by
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bb8d7664
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bb8d7664
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bb8d7664
Branch: refs/heads/4.x-HBase-0.98
Commit: bb8d7664fc5094356b83100648877420e6d32874
Parents: cdca937
Author: Samarth <sa...@salesforce.com>
Authored: Fri Apr 17 11:27:32 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Fri Apr 17 11:27:32 2015 -0700
----------------------------------------------------------------------
.../end2end/SkipScanAfterManualSplitIT.java | 2 +-
.../iterate/RoundRobinResultIteratorIT.java | 319 ++++++++++++++++++
.../apache/phoenix/mapreduce/IndexToolIT.java | 3 +-
.../org/apache/phoenix/compile/QueryPlan.java | 11 +
.../apache/phoenix/compile/TraceQueryPlan.java | 5 +
.../apache/phoenix/execute/AggregatePlan.java | 5 +
.../apache/phoenix/execute/BaseQueryPlan.java | 6 +
.../phoenix/execute/DegenerateQueryPlan.java | 5 +
.../phoenix/execute/DelegateQueryPlan.java | 6 +
.../org/apache/phoenix/execute/ScanPlan.java | 21 +-
.../phoenix/execute/SortMergeJoinPlan.java | 5 +
.../org/apache/phoenix/execute/UnionPlan.java | 5 +
.../iterate/RoundRobinResultIterator.java | 329 +++++++++++++++++++
.../apache/phoenix/jdbc/PhoenixResultSet.java | 8 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 5 +
.../phoenix/mapreduce/PhoenixRecordReader.java | 9 +-
.../org/apache/phoenix/query/QueryServices.java | 6 +
.../phoenix/query/QueryServicesOptions.java | 19 +-
.../apache/phoenix/schema/MetaDataClient.java | 2 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 22 ++
.../phoenix/compile/QueryCompilerTest.java | 62 +++-
.../phoenix/filter/SkipScanBigFilterTest.java | 2 +-
.../query/ParallelIteratorsSplitTest.java | 5 +
.../phoenix/query/QueryServicesTestImpl.java | 5 +-
24 files changed, 841 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
index a30a668..6d08202 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
@@ -69,7 +69,7 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
// props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32));
// enables manual splitting on salted tables
- props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false));
+ props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000));
props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
new file mode 100644
index 0000000..1e3db11
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class RoundRobinResultIteratorIT extends BaseHBaseManagedTimeIT {
+
+ private static final int NUM_SALT_BUCKETS = 4;
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32));
+ /*
+ * Don't force row key order. This causes RoundRobinResultIterator to be used if there was no order by specified
+ * on the query.
+ */
+ props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testRoundRobinAfterTableSplit() throws Exception {
+ String tableName = "ROUNDROBINSPLIT";
+ byte[] tableNameBytes = Bytes.toBytes(tableName);
+ int numRows = setupTableForSplit(tableName);
+ Connection conn = DriverManager.getConnection(getUrl());
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ int nRegions = services.getAllTableRegions(tableNameBytes).size();
+ int nRegionsBeforeSplit = nRegions;
+ HBaseAdmin admin = services.getAdmin();
+ try {
+ // Split is an async operation. So hoping 10 seconds is long enough time.
+ // If the test tends to flap, then you might want to increase the wait time
+ admin.split(tableName);
+ CountDownLatch latch = new CountDownLatch(1);
+ int nTries = 0;
+ long waitTimeMillis = 1000;
+ while (nRegions == nRegionsBeforeSplit && nTries < 10) {
+ latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
+ nRegions = services.getAllTableRegions(tableNameBytes).size();
+ nTries++;
+ }
+
+ String query = "SELECT * FROM " + tableName;
+ Statement stmt = conn.createStatement();
+ stmt.setFetchSize(10); // this makes scanner caches to be replenished in parallel.
+ ResultSet rs = stmt.executeQuery(query);
+ int numRowsRead = 0;
+ while (rs.next()) {
+ numRowsRead++;
+ }
+ nRegions = services.getAllTableRegions(tableNameBytes).size();
+ // Region cache has been updated, as there are more regions now
+ assertNotEquals(nRegions, nRegionsBeforeSplit);
+ assertEquals(numRows, numRowsRead);
+ } finally {
+ admin.close();
+ }
+
+ }
+
+ @Test
+ public void testSelectAllRowsWithDifferentFetchSizes_salted() throws Exception {
+ testSelectAllRowsWithDifferentFetchSizes(true);
+ }
+
+ @Test
+ public void testSelectAllRowsWithDifferentFetchSizes_unsalted() throws Exception {
+ testSelectAllRowsWithDifferentFetchSizes(false);
+ }
+
+ private void testSelectAllRowsWithDifferentFetchSizes(boolean salted) throws Exception {
+ String tableName = "ALLROWS" + (salted ? "_SALTED" : "_UNSALTED");
+ int numRows = 9;
+ Set<String> expectedKeys = Collections.unmodifiableSet(createTableAndInsertRows(tableName, numRows, salted, false));
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement("SELECT K, V FROM " + tableName);
+ tryWithFetchSize(new HashSet<>(expectedKeys), 1, stmt, 0);
+ tryWithFetchSize(new HashSet<>(expectedKeys), 2, stmt, salted ? 2 : 5);
+ tryWithFetchSize(new HashSet<>(expectedKeys), numRows - 1, stmt, salted ? 0 : 1);
+ tryWithFetchSize(new HashSet<>(expectedKeys), numRows, stmt, salted ? 0 : 1);
+ tryWithFetchSize(new HashSet<>(expectedKeys), numRows + 1, stmt, salted ? 0 : 1);
+ tryWithFetchSize(new HashSet<>(expectedKeys), numRows + 2, stmt, 0);
+ }
+
+ @Test
+ public void testSelectRowsWithFilterAndDifferentFetchSizes_unsalted() throws Exception {
+ testSelectRowsWithFilterAndDifferentFetchSizes(false);
+ }
+
+ @Test
+ public void testSelectRowsWithFilterAndDifferentFetchSizes_salted() throws Exception {
+ testSelectRowsWithFilterAndDifferentFetchSizes(true);
+ }
+
+ private void testSelectRowsWithFilterAndDifferentFetchSizes(boolean salted) throws Exception {
+ String tableName = "ROWSWITHFILTER" + (salted ? "_SALTED" : "_UNSALTED");
+ int numRows = 6;
+ Set<String> insertedKeys = createTableAndInsertRows(tableName, numRows, salted, false);
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K = ?");
+ stmt.setString(1, "key1"); // will return only 1 row
+ int numRowsFiltered = 1;
+ tryWithFetchSize(Sets.newHashSet("key1"), 1, stmt, 0);
+ tryWithFetchSize(Sets.newHashSet("key1"), 2, stmt, salted ? 1 : 1);
+ tryWithFetchSize(Sets.newHashSet("key1"), 3, stmt, 0);
+
+ stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K > ?");
+ stmt.setString(1, "key2");
+ insertedKeys.remove("key1");
+ insertedKeys.remove("key2"); // query should return 4 rows after key2.
+ numRowsFiltered = 4;
+ tryWithFetchSize(new HashSet<>(insertedKeys), 1, stmt, 0);
+ tryWithFetchSize(new HashSet<>(insertedKeys), 2, stmt, salted ? 1 : 2);
+ tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered - 1, stmt, salted ? 0 : 1);
+ tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered, stmt, salted ? 0 : 1);
+ tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered + 1, stmt, salted ? 0 : 1);
+ tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered + 2, stmt, 0);
+
+ stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K > ?");
+ stmt.setString(1, "key6");
+ insertedKeys.clear(); // query should return no rows;
+ tryWithFetchSize(new HashSet<>(insertedKeys), 1, stmt, 0);
+ tryWithFetchSize(new HashSet<>(insertedKeys), 2, stmt, 0);
+ tryWithFetchSize(new HashSet<>(insertedKeys), numRows - 1, stmt, 0);
+ tryWithFetchSize(new HashSet<>(insertedKeys), numRows, stmt, 0);
+ tryWithFetchSize(new HashSet<>(insertedKeys), numRows + 1, stmt, 0);
+ }
+
+ private Set<String> createTableAndInsertRows(String tableName, int numRows, boolean salted, boolean addTableNameToKey) throws Exception {
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + (salted ? "SALT_BUCKETS=" + NUM_SALT_BUCKETS : "");
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+ String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ final Set<String> expectedKeys = new HashSet<>(numRows);
+ for (int i = 1; i <= numRows; i++) {
+ String key = (addTableNameToKey ? tableName : "") + ("key" + i);
+ expectedKeys.add(key);
+ stmt.setString(1, key);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ return expectedKeys;
+ }
+
+ @Test
+ public void testFetchSizesAndRVCExpression() throws Exception {
+ String tableName = "RVCTest";
+ Set<String> insertedKeys = Collections.unmodifiableSet(createTableAndInsertRows(tableName, 4, false, false));
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement("SELECT K FROM " + tableName + " WHERE (K, V) > (?, ?)");
+ stmt.setString(1, "key0");
+ stmt.setString(2, "value0");
+ tryWithFetchSize(new HashSet<>(insertedKeys), 1, stmt, 0);
+ tryWithFetchSize(new HashSet<>(insertedKeys), 2, stmt, 2);
+ tryWithFetchSize(new HashSet<>(insertedKeys), 3, stmt, 1);
+ tryWithFetchSize(new HashSet<>(insertedKeys), 4, stmt, 1);
+ }
+
+ private static void tryWithFetchSize(Set<String> expectedKeys, int fetchSize, PreparedStatement stmt, int numFetches) throws Exception {
+ stmt.setFetchSize(fetchSize);
+ ResultSet rs = stmt.executeQuery();
+ int expectedNumRows = expectedKeys.size();
+ int numRows = 0;
+ while (rs.next()) {
+ expectedKeys.remove(rs.getString(1));
+ numRows ++;
+ }
+ assertEquals("Number of rows didn't match", expectedNumRows, numRows);
+ assertTrue("Not all rows were returned for fetch size: " + fetchSize + " - " + expectedKeys, expectedKeys.size() == 0);
+ assertRoundRobinBehavior(rs, stmt, numFetches);
+ }
+
+ private static int setupTableForSplit(String tableName) throws Exception {
+ int batchSize = 25;
+ int maxFileSize = 1024 * 10;
+ int payLoadSize = 1024;
+ String payload;
+ StringBuilder buf = new StringBuilder();
+ for (int i = 0; i < payLoadSize; i++) {
+ buf.append('a');
+ }
+ payload = buf.toString();
+
+ int MIN_CHAR = 'a';
+ int MAX_CHAR = 'z';
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName + "("
+ + "a VARCHAR PRIMARY KEY, b VARCHAR) "
+ + HTableDescriptor.MAX_FILESIZE + "=" + maxFileSize + ","
+ + " SALT_BUCKETS = " + NUM_SALT_BUCKETS);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?)");
+ int rowCount = 0;
+ for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; c1++) {
+ for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; c2++) {
+ String pk = Character.toString((char)c1) + Character.toString((char)c2);
+ stmt.setString(1, pk);
+ stmt.setString(2, payload);
+ stmt.execute();
+ rowCount++;
+ if (rowCount % batchSize == 0) {
+ conn.commit();
+ }
+ }
+ }
+ conn.commit();
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HBaseAdmin admin = services.getAdmin();
+ try {
+ admin.flush(tableName);
+ } finally {
+ admin.close();
+ }
+ conn.close();
+ return rowCount;
+ }
+
+ @Test
+ public void testUnionAllSelects() throws Exception {
+ int insertedRowsA = 10;
+ int insertedRowsB = 5;
+ int insertedRowsC = 7;
+ Set<String> keySetA = createTableAndInsertRows("TABLEA", insertedRowsA, true, true);
+ Set<String> keySetB = createTableAndInsertRows("TABLEB", insertedRowsB, true, true);
+ Set<String> keySetC = createTableAndInsertRows("TABLEC", insertedRowsC, false, true);
+ String query = "SELECT K FROM TABLEA UNION ALL SELECT K FROM TABLEB UNION ALL SELECT K FROM TABLEC";
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement(query);
+ stmt.setFetchSize(2); // force parallel fetch of scanner cache
+ ResultSet rs = stmt.executeQuery();
+ int rowsA = 0, rowsB = 0, rowsC = 0;
+ while (rs.next()) {
+ String key = rs.getString(1);
+ if (key.startsWith("TABLEA")) {
+ rowsA++;
+ } else if (key.startsWith("TABLEB")) {
+ rowsB++;
+ } else if (key.startsWith("TABLEC")) {
+ rowsC++;
+ }
+ keySetA.remove(key);
+ keySetB.remove(key);
+ keySetC.remove(key);
+ }
+ assertEquals("Not all rows of tableA were returned", 0, keySetA.size());
+ assertEquals("Not all rows of tableB were returned", 0, keySetB.size());
+ assertEquals("Not all rows of tableC were returned", 0, keySetC.size());
+ assertEquals("Number of rows retrieved didn't match for tableA", insertedRowsA, rowsA);
+ assertEquals("Number of rows retrieved didnt match for tableB", insertedRowsB, rowsB);
+ assertEquals("Number of rows retrieved didn't match for tableC", insertedRowsC, rowsC);
+ }
+
+
+ private static ResultIterator getResultIterator(ResultSet rs) throws SQLException {
+ return rs.unwrap(PhoenixResultSet.class).getUnderlyingIterator();
+ }
+
+ private static void assertRoundRobinBehavior(ResultSet rs, Statement stmt, int numFetches) throws SQLException {
+ ResultIterator itr = getResultIterator(rs);
+ if (stmt.getFetchSize() > 1) {
+ assertTrue(itr instanceof RoundRobinResultIterator);
+ RoundRobinResultIterator roundRobinItr = (RoundRobinResultIterator)itr;
+ assertEquals(numFetches, roundRobinItr.getNumberOfParallelFetches());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
index 2b7b16b..6761275 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
@@ -249,8 +249,7 @@ public class IndexToolIT {
if(isLocal) {
final String localIndexName = MetaDataUtil.getLocalIndexTableName(dataTable);
expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]"
- + "\n SERVER FILTER BY FIRST KEY ONLY"
- + "\nCLIENT MERGE SORT", localIndexName);
+ + "\n SERVER FILTER BY FIRST KEY ONLY", localIndexName);
} else {
expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s"
+ "\n SERVER FILTER BY FIRST KEY ONLY",indxTable);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index a76993c..d0c63fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.compile;
import java.sql.SQLException;
import java.util.List;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -69,4 +70,14 @@ public interface QueryPlan extends StatementPlan {
public boolean isDegenerate();
public boolean isRowKeyOrdered();
+
+ /**
+ *
+ * @return whether underlying {@link ResultScanner} can be picked up in a round-robin
+ * fashion. Generally, selecting scanners in such a fashion is possible if rows don't
+ * have to be returned back in a certain order.
+ * @throws SQLException
+ */
+ public boolean useRoundRobinIterator() throws SQLException;
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 815ac1e..12360f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -232,4 +232,9 @@ public class TraceQueryPlan implements QueryPlan {
public boolean isRowKeyOrdered() {
return false;
}
+
+ @Override
+ public boolean useRoundRobinIterator() {
+ return false;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 4f344b6..ba137f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -210,4 +210,9 @@ public class AggregatePlan extends BaseQueryPlan {
}
return resultScanner;
}
+
+ @Override
+ public boolean useRoundRobinIterator() throws SQLException {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 94233c8..c4e0345 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -17,6 +17,9 @@
*/
package org.apache.phoenix.execute;
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
+
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -51,6 +54,8 @@ import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName;
@@ -393,4 +398,5 @@ public abstract class BaseQueryPlan implements QueryPlan {
public boolean isRowKeyOrdered() {
return groupBy.isEmpty() ? orderBy.getOrderByExpressions().isEmpty() : groupBy.isOrderPreserving();
}
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 70e59b9..fda53ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -55,4 +55,9 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
return null;
}
+ @Override
+ public boolean useRoundRobinIterator() {
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 4d50ba0..7026433 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.execute;
import java.sql.ParameterMetaData;
+import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
@@ -101,5 +102,10 @@ public abstract class DelegateQueryPlan implements QueryPlan {
public boolean isRowKeyOrdered() {
return delegate.isRowKeyOrdered();
}
+
+ @Override
+ public boolean useRoundRobinIterator() throws SQLException {
+ return delegate.useRoundRobinIterator();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d0a71f4..884d835 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.execute;
+import static org.apache.phoenix.util.ScanUtil.shouldRowsBeInRowKeyOrder;
+
import java.sql.SQLException;
import java.util.List;
@@ -38,6 +40,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelIterators;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.ResultIterators;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
@@ -54,6 +57,7 @@ import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,7 +132,7 @@ public class ScanPlan extends BaseQueryPlan {
private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context,
TableRef table, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
- if (isSerial(context, table, orderBy, limit, allowPageFilter)) {
+ if (isSerial(context, table, orderBy, limit, allowPageFilter) || ScanUtil.isRoundRobinPossible(orderBy, context)) {
return ParallelIteratorFactory.NOOP_FACTORY;
}
ParallelIteratorFactory spoolingResultIteratorFactory =
@@ -182,13 +186,10 @@ public class ScanPlan extends BaseQueryPlan {
if (isOrdered) {
scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());
} else {
- if ((isSalted || table.getIndexType() == IndexType.LOCAL) &&
- (context.getConnection().getQueryServices().getProps().getBoolean(
- QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB,
- QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE) ||
- orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY ||
- orderBy == OrderBy.REV_ROW_KEY_ORDER_BY)) { // ORDER BY was optimized out b/c query is in row key order
+ if ((isSalted || table.getIndexType() == IndexType.LOCAL) && shouldRowsBeInRowKeyOrder(orderBy, context)) {
scanner = new MergeSortRowKeyResultIterator(iterators, isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0, orderBy == OrderBy.REV_ROW_KEY_ORDER_BY);
+ } else if (useRoundRobinIterator()) {
+ scanner = new RoundRobinResultIterator(iterators, this);
} else {
scanner = new ConcatResultIterator(iterators);
}
@@ -202,4 +203,10 @@ public class ScanPlan extends BaseQueryPlan {
}
return scanner;
}
+
+ @Override
+ public boolean useRoundRobinIterator() throws SQLException {
+ return ScanUtil.isRoundRobinPossible(orderBy, context);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index ce01b67..01e87e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -631,6 +631,11 @@ public class SortMergeJoinPlan implements QueryPlan {
}
}
+
+ @Override
+ public boolean useRoundRobinIterator() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 973f37e..031b58b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -186,5 +186,10 @@ public class UnionPlan implements QueryPlan {
public List<QueryPlan> getPlans() {
return this.plans;
}
+
+ @Override
+ public boolean useRoundRobinIterator() throws SQLException {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
new file mode 100644
index 0000000..4a9ad3e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+
+/**
+ * ResultIterator that keeps track of the number of records fetched by each {@link PeekingResultIterator} making sure it
+ * asks for records from each iterator in a round-robin fashion. When the iterators have fetched the scan cache size of
+ * records, it submits the iterators to the thread pool to help parallelize the I/O needed to fetch the next batch of
+ * records. This iterator assumes that the PeekingResultIterators that it manages are not nested i.e. they directly
+ * manage the underlying scanners. This kind of ResultIterator should only be used when one doesn't care about the order
+ * in which records are returned.
+ */
+public class RoundRobinResultIterator implements ResultIterator {
+
+ private static final Logger logger = LoggerFactory.getLogger(RoundRobinResultIterator.class);
+
+ private final int threshold;
+
+ private int numScannersCacheExhausted = 0;
+ private ResultIterators resultIterators;
+
+ private List<RoundRobinIterator> openIterators = new ArrayList<>();
+
+ private int index;
+ private boolean closed;
+ private final QueryPlan plan;
+
+ // For testing purposes
+ private int numParallelFetches;
+
+ public RoundRobinResultIterator(ResultIterators iterators, QueryPlan plan) {
+ this.resultIterators = iterators;
+ this.plan = plan;
+ this.threshold = getThreshold();
+ }
+
+ public RoundRobinResultIterator(List<PeekingResultIterator> iterators, QueryPlan plan) {
+ this.resultIterators = null;
+ this.plan = plan;
+ this.threshold = getThreshold();
+ initOpenIterators(wrapToRoundRobinIterators(iterators));
+ }
+
+ public static ResultIterator newIterator(final List<PeekingResultIterator> iterators, QueryPlan plan) {
+ if (iterators.isEmpty()) { return EMPTY_ITERATOR; }
+ if (iterators.size() == 1) { return iterators.get(0); }
+ return new RoundRobinResultIterator(iterators, plan);
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ List<RoundRobinIterator> iterators;
+ int size;
+ while ((size = (iterators = getIterators()).size()) > 0) {
+ index = index % size;
+ RoundRobinIterator itr = iterators.get(index);
+ if (itr.getNumRecordsRead() < threshold) {
+ Tuple tuple;
+ if ((tuple = itr.peek()) != null) {
+ tuple = itr.next();
+ if (itr.getNumRecordsRead() == threshold) {
+ numScannersCacheExhausted++;
+ }
+ index = (index + 1) % size;
+ return tuple;
+ } else {
+ // The underlying scanner is exhausted. Close the iterator and un-track it.
+ itr.close();
+ iterators.remove(index);
+ if (iterators.size() == 0) {
+ close();
+ }
+ }
+ } else {
+ index = (index + 1) % size;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (closed) { return; }
+ closed = true;
+ SQLException toThrow = null;
+ try {
+ if (resultIterators != null) {
+ resultIterators.close();
+ }
+ } catch (Exception e) {
+ toThrow = ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ if (openIterators.size() > 0) {
+ for (RoundRobinIterator itr : openIterators) {
+ try {
+ itr.close();
+ } catch (Exception e) {
+ if (toThrow == null) {
+ toThrow = ServerUtil.parseServerException(e);
+ } else {
+ toThrow.setNextException(ServerUtil.parseServerException(e));
+ }
+ }
+ }
+ }
+ } finally {
+ if (toThrow != null) { throw toThrow; }
+ }
+ }
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ if (resultIterators != null) {
+ resultIterators.explain(planSteps);
+ }
+ }
+
+ @VisibleForTesting
+ int getNumberOfParallelFetches() {
+ return numParallelFetches;
+ }
+
+ @VisibleForTesting
+ QueryPlan getQueryPlan() {
+ return plan;
+ }
+
+ private List<RoundRobinIterator> getIterators() throws SQLException {
+ if (closed) { return Collections.emptyList(); }
+ if (openIterators.size() > 0 && openIterators.size() == numScannersCacheExhausted) {
+ /*
+ * All the scanners have exhausted their cache. Submit the scanners back to the pool so that they can fetch
+ * the next batch of records in parallel.
+ */
+ initOpenIterators(fetchNextBatch());
+ } else if (openIterators.size() == 0 && resultIterators != null) {
+ List<PeekingResultIterator> iterators = resultIterators.getIterators();
+ initOpenIterators(wrapToRoundRobinIterators(iterators));
+ }
+ return openIterators;
+ }
+
+ private List<RoundRobinIterator> wrapToRoundRobinIterators(List<PeekingResultIterator> iterators) {
+ List<RoundRobinIterator> roundRobinItrs = new ArrayList<>(iterators.size());
+ for (PeekingResultIterator itr : iterators) {
+ roundRobinItrs.add(new RoundRobinIterator(itr, null));
+ }
+ return roundRobinItrs;
+ }
+
+ private void initOpenIterators(List<RoundRobinIterator> iterators) {
+ openIterators.clear();
+ openIterators.addAll(iterators);
+ index = 0;
+ numScannersCacheExhausted = 0;
+ }
+
+ private int getThreshold() {
+ int cacheSize = getScannerCacheSize();
+ checkArgument(cacheSize > 1, "RoundRobinResultIterator doesn't work when cache size is less than or equal to 1");
+ return cacheSize - 1;
+ }
+
+ private int getScannerCacheSize() {
+ try {
+ return plan.getContext().getStatement().getFetchSize();
+ } catch (Throwable e) {
+ Throwables.propagate(e);
+ }
+ return -1; // unreachable
+ }
+
+ private List<RoundRobinIterator> fetchNextBatch() throws SQLException {
+ int numExpectedIterators = openIterators.size();
+ List<Future<Tuple>> futures = new ArrayList<>(numExpectedIterators);
+ List<RoundRobinIterator> results = new ArrayList<>();
+
+ // Randomize the order in which we will be hitting region servers to try not overload particular region servers.
+ Collections.shuffle(openIterators);
+ boolean success = false;
+ SQLException toThrow = null;
+ try {
+ StatementContext context = plan.getContext();
+ final ConnectionQueryServices services = context.getConnection().getQueryServices();
+ ExecutorService executor = services.getExecutor();
+ numParallelFetches++;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Performing parallel fetch for " + openIterators.size() + " iterators. ");
+ }
+ for (final RoundRobinIterator itr : openIterators) {
+ Future<Tuple> future = executor.submit(new Callable<Tuple>() {
+ @Override
+ public Tuple call() throws Exception {
+ // Read the next record to refill the scanner's cache.
+ return itr.next();
+ }
+ });
+ futures.add(future);
+ }
+ int i = 0;
+ for (Future<Tuple> future : futures) {
+ Tuple tuple = future.get();
+ if (tuple != null) {
+ results.add(new RoundRobinIterator(openIterators.get(i), tuple));
+ } else {
+ // Underlying scanner is exhausted. So close it.
+ openIterators.get(i).close();
+ }
+ i++;
+ }
+ success = true;
+ return results;
+ } catch (SQLException e) {
+ toThrow = e;
+ } catch (Exception e) {
+ toThrow = ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ if (!success) {
+ try {
+ close();
+ } catch (Exception e) {
+ if (toThrow == null) {
+ toThrow = ServerUtil.parseServerException(e);
+ } else {
+ toThrow.setNextException(ServerUtil.parseServerException(e));
+ }
+ }
+ }
+ } finally {
+ if (toThrow != null) {
+ FAILED_QUERY.increment();
+ throw toThrow;
+ }
+ }
+ }
+ return null; // Not reachable
+ }
+
+ /**
+ * Inner class that delegates to {@link PeekingResultIterator} keeping track the number of records it has read. Also
+ * keeps track of the tuple the {@link PeekingResultIterator} read in the previous next() call before it ran out of
+ * underlying scanner cache.
+ */
+ private class RoundRobinIterator implements PeekingResultIterator {
+
+ private PeekingResultIterator delegate;
+ private Tuple tuple;
+ private int numRecordsRead;
+
+ private RoundRobinIterator(PeekingResultIterator itr, Tuple tuple) {
+ this.delegate = itr;
+ this.tuple = tuple;
+ this.numRecordsRead = 0;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ delegate.close();
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ if (tuple != null) {
+ Tuple t = tuple;
+ tuple = null;
+ return t;
+ }
+ numRecordsRead++;
+ return delegate.next();
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ delegate.explain(planSteps);
+ }
+
+ @Override
+ public Tuple peek() throws SQLException {
+ if (tuple != null) { return tuple; }
+ return delegate.peek();
+ }
+
+ public int getNumRecordsRead() {
+ return numRecordsRead;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 49e384c..8ee56ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -65,9 +65,10 @@ import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.SQLCloseable;
+import com.google.common.annotations.VisibleForTesting;
+
/**
@@ -1255,4 +1256,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
public <T> T getObject(String columnLabel, Class<T> type) throws SQLException {
return (T) getObject(columnLabel); // Just ignore type since we only support built-in types
}
+
+ @VisibleForTesting
+ public ResultIterator getUnderlyingIterator() {
+ return scanner;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 465d84a..21b641b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -489,6 +489,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
public boolean isRowKeyOrdered() {
return true;
}
+
+ @Override
+ public boolean useRoundRobinIterator() throws SQLException {
+ return false;
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 2074658..eb6dc3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -100,11 +101,11 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
try {
List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
for (Scan scan : scans) {
- final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+ final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(), scan);
PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
iterators.add(peekingResultIterator);
}
- ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
+ ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
}
@@ -117,8 +118,8 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
Throwables.propagate(e);
}
}
-
- @Override
+
+ @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = NullWritable.get();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 23f3288..4a5b304 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -82,7 +82,10 @@ public interface QueryServices extends SQLCloseable {
public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
+
+ // Deprecated. Use FORCE_ROW_KEY_ORDER instead.
public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable";
+
public static final String USE_INDEXES_ATTRIB = "phoenix.query.useIndexes";
public static final String IMMUTABLE_ROWS_ATTRIB = "phoenix.mutate.immutableRows";
public static final String INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB = "phoenix.index.mutableBatchSizeThreshold";
@@ -158,6 +161,9 @@ public interface QueryServices extends SQLCloseable {
// rpc queue configs
public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
+
+ public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
+
/**
* Get executor service used for parallel scans
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3912ea5..cf62333 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB
import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
@@ -100,7 +101,6 @@ public class QueryServicesOptions {
public static final String DEFAULT_DATE_FORMAT_TIMEZONE = DateUtil.DEFAULT_TIME_ZONE_ID;
public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true;
public static final int DEFAULT_MAX_MUTATION_SIZE = 500000;
- public static final boolean DEFAULT_ROW_KEY_ORDER_SALTED_TABLE = true; // Merge sort on client to ensure salted tables are row key ordered
public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes
public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated
public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also.
@@ -191,6 +191,7 @@ public class QueryServicesOptions {
private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
+ public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false;
private final Configuration config;
@@ -229,7 +230,7 @@ public class QueryServicesOptions {
.setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_STATS_UPDATE_FREQ_MS)
.setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, DEFAULT_CALL_QUEUE_ROUND_ROBIN)
.setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE)
- .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_ROW_KEY_ORDER_SALTED_TABLE)
+ .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
.setIfUnset(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES)
.setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS)
.setIfUnset(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD)
@@ -246,6 +247,7 @@ public class QueryServicesOptions {
.setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
.setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
.setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX)
+ .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
;
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
@@ -352,10 +354,6 @@ public class QueryServicesOptions {
return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize);
}
- public QueryServicesOptions setRowKeyOrderSaltedTable(boolean rowKeyOrderSaltedTable) {
- return set(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, rowKeyOrderSaltedTable);
- }
-
public QueryServicesOptions setDropMetaData(boolean dropMetadata) {
return set(DROP_METADATA_ATTRIB, dropMetadata);
}
@@ -452,6 +450,10 @@ public class QueryServicesOptions {
public boolean isUseByteBasedRegex() {
return config.getBoolean(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX);
}
+
+ public int getScanCacheSize() {
+ return config.getInt(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE);
+ }
public QueryServicesOptions setMaxServerCacheTTLMs(int ttl) {
return set(MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, ttl);
@@ -531,4 +533,9 @@ public class QueryServicesOptions {
config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag);
return this;
}
+
+ public QueryServicesOptions setForceRowKeyOrder(boolean forceRowKeyOrder) {
+ config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder);
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index e414039..22208f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1696,7 +1696,7 @@ public class MetaDataClient {
splits = getSplitKeys(connection.getQueryServices().getAllTableRegions(parent.getPhysicalName().getBytes()));
} else {
splits = SchemaUtil.processSplits(splits, pkColumns, saltBucketNum, connection.getQueryServices().getProps().getBoolean(
- QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE));
+ QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER));
}
MetaDataMutationResult result = connection.getQueryServices().createTable(
tableMetaData,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 2268866..a1473ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.util;
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
import java.io.IOException;
@@ -38,7 +40,9 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.filter.BooleanExpressionFilter;
@@ -46,6 +50,8 @@ import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.KeyRange.Bound;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.RowKeySchema;
@@ -658,4 +664,20 @@ public class ScanUtil {
}
return filterIterator;
}
+
+ public static boolean isRoundRobinPossible(OrderBy orderBy, StatementContext context) throws SQLException {
+ int fetchSize = context.getStatement().getFetchSize();
+ /*
+ * Selecting underlying scanners in a round-robin fashion is possible if there is no ordering of rows needed,
+ * not even row key order. Also no point doing round robin of scanners if fetch size
+ * is 1.
+ */
+ return fetchSize > 1 && !shouldRowsBeInRowKeyOrder(orderBy, context) && orderBy.getOrderByExpressions().isEmpty();
+ }
+
+ public static boolean shouldRowsBeInRowKeyOrder(OrderBy orderBy, StatementContext context) {
+ boolean forceRowKeyOrder = context.getConnection().getQueryServices().getProps()
+ .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER);
+ return forceRowKeyOrder || orderBy == FWD_ROW_KEY_ORDER_BY || orderBy == REV_ROW_KEY_ORDER_BY;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 77eb237..7be8eae 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnAlreadyExistsException;
import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -1732,5 +1733,64 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query);
assertFalse("Expected group by not to be order preserving: " + query, plan.getGroupBy().isOrderPreserving());
}
- }
+ }
+
+ @Test
+ public void testUseRoundRobinIterator() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.createStatement().execute("CREATE TABLE t (k1 char(2) not null, k2 varchar not null, k3 integer not null, v varchar, constraint pk primary key(k1,k2,k3))");
+ String[] queries = {
+ "SELECT 1 FROM T ",
+ "SELECT 1 FROM T WHERE V = 'c'",
+ "SELECT 1 FROM T WHERE (k1,k2, k3) > ('a', 'ab', 1)",
+ };
+ String query;
+ for (int i = 0; i < queries.length; i++) {
+ query = queries[i];
+ QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query);
+ assertTrue("Expected plan to use round robin iterator " + query, plan.useRoundRobinIterator());
+ }
+ }
+
+ @Test
+ public void testForcingRowKeyOrderNotUseRoundRobinIterator() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(true));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.createStatement().execute("CREATE TABLE t (k1 char(2) not null, k2 varchar not null, k3 integer not null, v varchar, constraint pk primary key(k1,k2,k3))");
+ String[] queries = {
+ "SELECT 1 FROM T ",
+ "SELECT 1 FROM T WHERE V = 'c'",
+ "SELECT 1 FROM T WHERE (k1, k2, k3) > ('a', 'ab', 1)",
+ };
+ String query;
+ for (int i = 0; i < queries.length; i++) {
+ query = queries[i];
+ QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query);
+ assertFalse("Expected plan to not use round robin iterator " + query, plan.useRoundRobinIterator());
+ }
+ }
+
+ @Test
+ public void testPlanForOrderByOrGroupByNotUseRoundRobin() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.createStatement().execute("CREATE TABLE t (k1 char(2) not null, k2 varchar not null, k3 integer not null, v varchar, constraint pk primary key(k1,k2,k3))");
+ String[] queries = {
+ "SELECT 1 FROM T ORDER BY K1",
+ "SELECT 1 FROM T WHERE V = 'c' ORDER BY K1, K2",
+ "SELECT 1 FROM T WHERE (k1,k2, k3) > ('a', 'ab', 1) ORDER BY V",
+ "SELECT 1 FROM T GROUP BY V",
+ "SELECT 1 FROM T GROUP BY K1, V, K2 ORDER BY V",
+ };
+ String query;
+ for (int i = 0; i < queries.length; i++) {
+ query = queries[i];
+ QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query);
+ assertFalse("Expected plan to not use round robin iterator " + query, plan.useRoundRobinIterator());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
index 29e14bf..e645383 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
@@ -600,7 +600,7 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest {
public static void doSetup() throws Exception {
Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
// enables manual splitting on salted tables
- props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false));
+ props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
initDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index f929eb4..6f2a2f1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -411,6 +411,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
public List<List<Scan>> getScans() {
return null;
}
+
+ @Override
+ public boolean useRoundRobinIterator() {
+ return false;
+ }
}, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
List<KeyRange> keyRanges = parallelIterators.getSplits();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index c1b7f99..9fee78f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -59,6 +59,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
* value overwhelms our mini clusters.
*/
public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = 4;
+ public static final boolean DEFAULT_FORCE_ROWKEY_ORDER = true;
public QueryServicesTestImpl(ReadOnlyProps defaultProps) {
@@ -79,7 +80,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
.setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS)
.setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC)
.setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE)
- .setRowKeyOrderSaltedTable(true)
.setMaxServerCacheTTLMs(DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS)
.setMasterInfoPort(DEFAULT_MASTER_INFO_PORT)
.setRegionServerInfoPort(DEFAULT_REGIONSERVER_INFO_PORT)
@@ -88,7 +88,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
.setWALEditCodec(DEFAULT_WAL_EDIT_CODEC)
.setDropMetaData(DEFAULT_DROP_METADATA)
.setMaxClientMetaDataCacheSize(DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE)
- .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE);
+ .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE)
+ .setForceRowKeyOrder(DEFAULT_FORCE_ROWKEY_ORDER);
}
public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps overrideProps) {