You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/04/17 20:39:22 UTC

phoenix git commit: PHOENIX-1779 Parallelize fetching of next batch of records for scans corresponding to queries with no order by

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 cdca9377e -> bb8d7664f


PHOENIX-1779 Parallelize fetching of next batch of records for scans corresponding to queries with no order by


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bb8d7664
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bb8d7664
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bb8d7664

Branch: refs/heads/4.x-HBase-0.98
Commit: bb8d7664fc5094356b83100648877420e6d32874
Parents: cdca937
Author: Samarth <sa...@salesforce.com>
Authored: Fri Apr 17 11:27:32 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Fri Apr 17 11:27:32 2015 -0700

----------------------------------------------------------------------
 .../end2end/SkipScanAfterManualSplitIT.java     |   2 +-
 .../iterate/RoundRobinResultIteratorIT.java     | 319 ++++++++++++++++++
 .../apache/phoenix/mapreduce/IndexToolIT.java   |   3 +-
 .../org/apache/phoenix/compile/QueryPlan.java   |  11 +
 .../apache/phoenix/compile/TraceQueryPlan.java  |   5 +
 .../apache/phoenix/execute/AggregatePlan.java   |   5 +
 .../apache/phoenix/execute/BaseQueryPlan.java   |   6 +
 .../phoenix/execute/DegenerateQueryPlan.java    |   5 +
 .../phoenix/execute/DelegateQueryPlan.java      |   6 +
 .../org/apache/phoenix/execute/ScanPlan.java    |  21 +-
 .../phoenix/execute/SortMergeJoinPlan.java      |   5 +
 .../org/apache/phoenix/execute/UnionPlan.java   |   5 +
 .../iterate/RoundRobinResultIterator.java       | 329 +++++++++++++++++++
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |   8 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   5 +
 .../phoenix/mapreduce/PhoenixRecordReader.java  |   9 +-
 .../org/apache/phoenix/query/QueryServices.java |   6 +
 .../phoenix/query/QueryServicesOptions.java     |  19 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   2 +-
 .../java/org/apache/phoenix/util/ScanUtil.java  |  22 ++
 .../phoenix/compile/QueryCompilerTest.java      |  62 +++-
 .../phoenix/filter/SkipScanBigFilterTest.java   |   2 +-
 .../query/ParallelIteratorsSplitTest.java       |   5 +
 .../phoenix/query/QueryServicesTestImpl.java    |   5 +-
 24 files changed, 841 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
index a30a668..6d08202 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
@@ -69,7 +69,7 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
         // props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
         props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32));
         // enables manual splitting on salted tables
-        props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false));
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
         props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000));
         props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
new file mode 100644
index 0000000..1e3db11
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class RoundRobinResultIteratorIT extends BaseHBaseManagedTimeIT {
+
+    private static final int NUM_SALT_BUCKETS = 4; 
+
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32));
+        /*  
+         * Don't force row key order. This causes RoundRobinResultIterator to be used if there was no order by specified
+         * on the query.
+         */
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testRoundRobinAfterTableSplit() throws Exception {
+        String tableName = "ROUNDROBINSPLIT";
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        int numRows = setupTableForSplit(tableName);
+        Connection conn = DriverManager.getConnection(getUrl());
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        int nRegions = services.getAllTableRegions(tableNameBytes).size();
+        int nRegionsBeforeSplit = nRegions;
+        HBaseAdmin admin = services.getAdmin();
+        try {
+            // Split is an async operation. So hoping 10 seconds is long enough time.
+            // If the test tends to flap, then you might want to increase the wait time
+            admin.split(tableName);
+            CountDownLatch latch = new CountDownLatch(1);
+            int nTries = 0;
+            long waitTimeMillis = 1000;
+            while (nRegions == nRegionsBeforeSplit && nTries < 10) {
+                latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
+                nRegions = services.getAllTableRegions(tableNameBytes).size();
+                nTries++;
+            }
+            
+            String query = "SELECT * FROM " + tableName;
+            Statement stmt = conn.createStatement();
+            stmt.setFetchSize(10); // this makes scanner caches to be replenished in parallel.
+            ResultSet rs = stmt.executeQuery(query);
+            int numRowsRead = 0;
+            while (rs.next()) {
+                numRowsRead++;
+            }
+            nRegions = services.getAllTableRegions(tableNameBytes).size();
+            // Region cache has been updated, as there are more regions now
+            assertNotEquals(nRegions, nRegionsBeforeSplit);
+            assertEquals(numRows, numRowsRead);
+        } finally {
+            admin.close();
+        }
+
+    }
+
+    @Test
+    public void testSelectAllRowsWithDifferentFetchSizes_salted() throws Exception {
+        testSelectAllRowsWithDifferentFetchSizes(true);
+    }
+
+    @Test
+    public void testSelectAllRowsWithDifferentFetchSizes_unsalted() throws Exception {
+        testSelectAllRowsWithDifferentFetchSizes(false);
+    }
+
+    private void testSelectAllRowsWithDifferentFetchSizes(boolean salted) throws Exception {
+        String tableName = "ALLROWS" + (salted ? "_SALTED" : "_UNSALTED");
+        int numRows = 9;
+        Set<String> expectedKeys = Collections.unmodifiableSet(createTableAndInsertRows(tableName, numRows, salted, false));
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement("SELECT K, V FROM " + tableName);
+        tryWithFetchSize(new HashSet<>(expectedKeys), 1, stmt, 0);
+        tryWithFetchSize(new HashSet<>(expectedKeys), 2, stmt, salted ? 2 : 5);
+        tryWithFetchSize(new HashSet<>(expectedKeys), numRows - 1, stmt, salted ? 0 : 1);
+        tryWithFetchSize(new HashSet<>(expectedKeys), numRows, stmt, salted ? 0 : 1);
+        tryWithFetchSize(new HashSet<>(expectedKeys), numRows + 1, stmt, salted ? 0 : 1);
+        tryWithFetchSize(new HashSet<>(expectedKeys), numRows + 2, stmt, 0);
+    }
+
+    @Test
+    public void testSelectRowsWithFilterAndDifferentFetchSizes_unsalted() throws Exception {
+        testSelectRowsWithFilterAndDifferentFetchSizes(false);
+    }
+
+    @Test
+    public void testSelectRowsWithFilterAndDifferentFetchSizes_salted() throws Exception {
+        testSelectRowsWithFilterAndDifferentFetchSizes(true);
+    }
+
+    private void testSelectRowsWithFilterAndDifferentFetchSizes(boolean salted) throws Exception {
+        String tableName = "ROWSWITHFILTER" + (salted ? "_SALTED" : "_UNSALTED");
+        int numRows = 6;
+        Set<String> insertedKeys = createTableAndInsertRows(tableName, numRows, salted, false);
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K = ?");
+        stmt.setString(1, "key1"); // will return only 1 row
+        int numRowsFiltered = 1;
+        tryWithFetchSize(Sets.newHashSet("key1"), 1, stmt, 0);
+        tryWithFetchSize(Sets.newHashSet("key1"), 2, stmt, salted ? 1 : 1);
+        tryWithFetchSize(Sets.newHashSet("key1"), 3, stmt, 0);
+
+        stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K > ?");
+        stmt.setString(1, "key2");
+        insertedKeys.remove("key1");
+        insertedKeys.remove("key2"); // query should return 4 rows after key2.
+        numRowsFiltered = 4;
+        tryWithFetchSize(new HashSet<>(insertedKeys), 1, stmt, 0);
+        tryWithFetchSize(new HashSet<>(insertedKeys), 2, stmt, salted ? 1 : 2);
+        tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered - 1, stmt, salted ? 0 : 1);
+        tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered, stmt, salted ? 0 : 1);
+        tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered + 1, stmt, salted ? 0 : 1);
+        tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered + 2, stmt, 0);
+
+        stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K > ?");
+        stmt.setString(1, "key6");
+        insertedKeys.clear(); // query should return no rows;
+        tryWithFetchSize(new HashSet<>(insertedKeys), 1, stmt, 0);
+        tryWithFetchSize(new HashSet<>(insertedKeys), 2, stmt, 0);
+        tryWithFetchSize(new HashSet<>(insertedKeys), numRows - 1, stmt, 0);
+        tryWithFetchSize(new HashSet<>(insertedKeys), numRows, stmt, 0);
+        tryWithFetchSize(new HashSet<>(insertedKeys), numRows + 1, stmt, 0);
+    }
+
+    private Set<String> createTableAndInsertRows(String tableName, int numRows, boolean salted, boolean addTableNameToKey) throws Exception {
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + (salted ? "SALT_BUCKETS=" + NUM_SALT_BUCKETS : "");
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        final Set<String> expectedKeys = new HashSet<>(numRows);
+        for (int i = 1; i <= numRows; i++) {
+            String key = (addTableNameToKey ? tableName : "") + ("key" + i);
+            expectedKeys.add(key);
+            stmt.setString(1, key);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+        return expectedKeys;
+    }
+
+    @Test
+    public void testFetchSizesAndRVCExpression() throws Exception {
+        String tableName = "RVCTest";
+        Set<String> insertedKeys = Collections.unmodifiableSet(createTableAndInsertRows(tableName, 4, false, false));
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement("SELECT K FROM " + tableName + " WHERE (K, V)  > (?, ?)");
+        stmt.setString(1, "key0");
+        stmt.setString(2, "value0");
+        tryWithFetchSize(new HashSet<>(insertedKeys), 1, stmt, 0);
+        tryWithFetchSize(new HashSet<>(insertedKeys), 2, stmt, 2);
+        tryWithFetchSize(new HashSet<>(insertedKeys), 3, stmt, 1);
+        tryWithFetchSize(new HashSet<>(insertedKeys), 4, stmt, 1);
+    }
+
+    private static void tryWithFetchSize(Set<String> expectedKeys, int fetchSize, PreparedStatement stmt, int numFetches) throws Exception {
+        stmt.setFetchSize(fetchSize);
+        ResultSet rs = stmt.executeQuery();
+        int expectedNumRows = expectedKeys.size();
+        int numRows = 0;
+        while (rs.next()) {
+            expectedKeys.remove(rs.getString(1));
+            numRows ++;
+        }
+        assertEquals("Number of rows didn't match", expectedNumRows, numRows);
+        assertTrue("Not all rows were returned for fetch size: " + fetchSize + " - " + expectedKeys, expectedKeys.size() == 0);
+        assertRoundRobinBehavior(rs, stmt, numFetches);
+    }
+
+    private static int setupTableForSplit(String tableName) throws Exception {
+        int batchSize = 25;
+        int maxFileSize = 1024 * 10;
+        int payLoadSize = 1024;
+        String payload;
+        StringBuilder buf = new StringBuilder();
+        for (int i = 0; i < payLoadSize; i++) {
+            buf.append('a');
+        }
+        payload = buf.toString();
+
+        int MIN_CHAR = 'a';
+        int MAX_CHAR = 'z';
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE " + tableName + "("
+                + "a VARCHAR PRIMARY KEY, b VARCHAR) " 
+                + HTableDescriptor.MAX_FILESIZE + "=" + maxFileSize + ","
+                + " SALT_BUCKETS = " + NUM_SALT_BUCKETS);
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?)");
+        int rowCount = 0;
+        for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; c1++) {
+            for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; c2++) {
+                String pk = Character.toString((char)c1) + Character.toString((char)c2);
+                stmt.setString(1, pk);
+                stmt.setString(2, payload);
+                stmt.execute();
+                rowCount++;
+                if (rowCount % batchSize == 0) {
+                    conn.commit();
+                }
+            }
+        }
+        conn.commit();
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HBaseAdmin admin = services.getAdmin();
+        try {
+            admin.flush(tableName);
+        } finally {
+            admin.close();
+        }
+        conn.close();
+        return rowCount;
+    }
+
+    @Test
+    public void testUnionAllSelects() throws Exception {
+        int insertedRowsA = 10;
+        int insertedRowsB = 5;
+        int insertedRowsC = 7;
+        Set<String> keySetA = createTableAndInsertRows("TABLEA", insertedRowsA, true, true);
+        Set<String> keySetB = createTableAndInsertRows("TABLEB", insertedRowsB, true, true);
+        Set<String> keySetC = createTableAndInsertRows("TABLEC", insertedRowsC, false, true);
+        String query = "SELECT K FROM TABLEA UNION ALL SELECT K FROM TABLEB UNION ALL SELECT K FROM TABLEC";
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement(query);
+        stmt.setFetchSize(2); // force parallel fetch of scanner cache
+        ResultSet rs = stmt.executeQuery();
+        int rowsA = 0, rowsB = 0, rowsC = 0;
+        while (rs.next()) {
+            String key = rs.getString(1);
+            if (key.startsWith("TABLEA")) {
+                rowsA++;
+            } else if (key.startsWith("TABLEB")) {
+                rowsB++;
+            } else if (key.startsWith("TABLEC")) {
+                rowsC++;
+            }
+            keySetA.remove(key);
+            keySetB.remove(key);
+            keySetC.remove(key);
+        }
+        assertEquals("Not all rows of tableA were returned", 0, keySetA.size());
+        assertEquals("Not all rows of tableB were returned", 0, keySetB.size());
+        assertEquals("Not all rows of tableC were returned", 0, keySetC.size());
+        assertEquals("Number of rows retrieved didn't match for tableA", insertedRowsA, rowsA);
+        assertEquals("Number of rows retrieved didnt match for tableB", insertedRowsB, rowsB);
+        assertEquals("Number of rows retrieved didn't match for tableC", insertedRowsC, rowsC);
+    }
+
+
+    private static ResultIterator getResultIterator(ResultSet rs) throws SQLException {
+        return rs.unwrap(PhoenixResultSet.class).getUnderlyingIterator();
+    }
+
+    private static void assertRoundRobinBehavior(ResultSet rs, Statement stmt, int numFetches) throws SQLException {
+        ResultIterator itr = getResultIterator(rs);
+        if (stmt.getFetchSize() > 1) {
+            assertTrue(itr instanceof RoundRobinResultIterator);
+            RoundRobinResultIterator roundRobinItr = (RoundRobinResultIterator)itr;
+            assertEquals(numFetches, roundRobinItr.getNumberOfParallelFetches());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
index 2b7b16b..6761275 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
@@ -249,8 +249,7 @@ public class IndexToolIT {
         if(isLocal) {
             final String localIndexName = MetaDataUtil.getLocalIndexTableName(dataTable);
             expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]"
-                + "\n    SERVER FILTER BY FIRST KEY ONLY"
-                + "\nCLIENT MERGE SORT", localIndexName);
+                + "\n    SERVER FILTER BY FIRST KEY ONLY", localIndexName);
         } else {
             expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s"
                     + "\n    SERVER FILTER BY FIRST KEY ONLY",indxTable);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index a76993c..d0c63fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.compile;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -69,4 +70,14 @@ public interface QueryPlan extends StatementPlan {
     public boolean isDegenerate();
     
     public boolean isRowKeyOrdered();
+    
+    /**
+     * 
+     * @return whether underlying {@link ResultScanner} can be picked up in a round-robin 
+     * fashion. Generally, selecting scanners in such a fashion is possible if rows don't
+     * have to be returned back in a certain order.
+     * @throws SQLException 
+     */
+    public boolean useRoundRobinIterator() throws SQLException;
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 815ac1e..12360f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -232,4 +232,9 @@ public class TraceQueryPlan implements QueryPlan {
     public boolean isRowKeyOrdered() {
         return false;
     }
+    
+    @Override
+    public boolean useRoundRobinIterator() {
+        return false;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 4f344b6..ba137f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -210,4 +210,9 @@ public class AggregatePlan extends BaseQueryPlan {
         }
         return resultScanner;
     }
+
+    @Override
+    public boolean useRoundRobinIterator() throws SQLException {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 94233c8..c4e0345 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.execute;
 
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -51,6 +54,8 @@ import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
@@ -393,4 +398,5 @@ public abstract class BaseQueryPlan implements QueryPlan {
     public boolean isRowKeyOrdered() {
         return groupBy.isEmpty() ? orderBy.getOrderByExpressions().isEmpty() : groupBy.isOrderPreserving();
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 70e59b9..fda53ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -55,4 +55,9 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
         return null;
     }
 
+    @Override
+    public boolean useRoundRobinIterator() {
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 4d50ba0..7026433 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.execute;
 
 import java.sql.ParameterMetaData;
+import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
@@ -101,5 +102,10 @@ public abstract class DelegateQueryPlan implements QueryPlan {
     public boolean isRowKeyOrdered() {
         return delegate.isRowKeyOrdered();
     }
+    
+    @Override
+    public boolean useRoundRobinIterator() throws SQLException {
+        return delegate.useRoundRobinIterator();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d0a71f4..884d835 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.execute;
 
 
+import static org.apache.phoenix.util.ScanUtil.shouldRowsBeInRowKeyOrder;
+
 import java.sql.SQLException;
 import java.util.List;
 
@@ -38,6 +40,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.ResultIterators;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
@@ -54,6 +57,7 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -128,7 +132,7 @@ public class ScanPlan extends BaseQueryPlan {
     private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context,
             TableRef table, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
 
-        if (isSerial(context, table, orderBy, limit, allowPageFilter)) {
+        if (isSerial(context, table, orderBy, limit, allowPageFilter) || ScanUtil.isRoundRobinPossible(orderBy, context)) {
             return ParallelIteratorFactory.NOOP_FACTORY;
         }
         ParallelIteratorFactory spoolingResultIteratorFactory =
@@ -182,13 +186,10 @@ public class ScanPlan extends BaseQueryPlan {
         if (isOrdered) {
             scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());
         } else {
-            if ((isSalted || table.getIndexType() == IndexType.LOCAL) &&
-                    (context.getConnection().getQueryServices().getProps().getBoolean(
-                            QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB,
-                            QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE) ||
-                     orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY ||
-                     orderBy == OrderBy.REV_ROW_KEY_ORDER_BY)) { // ORDER BY was optimized out b/c query is in row key order
+            if ((isSalted || table.getIndexType() == IndexType.LOCAL) && shouldRowsBeInRowKeyOrder(orderBy, context)) { 
                 scanner = new MergeSortRowKeyResultIterator(iterators, isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0, orderBy == OrderBy.REV_ROW_KEY_ORDER_BY);
+            } else if (useRoundRobinIterator()) {
+                scanner = new RoundRobinResultIterator(iterators, this);
             } else {
                 scanner = new ConcatResultIterator(iterators);
             }
@@ -202,4 +203,10 @@ public class ScanPlan extends BaseQueryPlan {
         }
         return scanner;
     }
+    
+    @Override
+    public boolean useRoundRobinIterator() throws SQLException {
+        return ScanUtil.isRoundRobinPossible(orderBy, context);
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index ce01b67..01e87e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -631,6 +631,11 @@ public class SortMergeJoinPlan implements QueryPlan {
             
         }
     }
+    
+    @Override
+    public boolean useRoundRobinIterator() {
+        return false;
+    }
 
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 973f37e..031b58b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -186,5 +186,10 @@ public class UnionPlan implements QueryPlan {
     public List<QueryPlan> getPlans() {
         return this.plans;
     }
+
+    @Override
+    public boolean useRoundRobinIterator() throws SQLException {
+        return false;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
new file mode 100644
index 0000000..4a9ad3e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+
+/**
+ * ResultIterator that keeps track of the number of records fetched by each {@link PeekingResultIterator} making sure it
+ * asks for records from each iterator in a round-robin fashion. When the iterators have fetched the scan cache size of
+ * records, it submits the iterators to the thread pool to help parallelize the I/O needed to fetch the next batch of
+ * records. This iterator assumes that the PeekingResultIterators that it manages are not nested i.e. they directly
+ * manage the underlying scanners. This kind of ResultIterator should only be used when one doesn't care about the order
+ * in which records are returned.
+ */
+public class RoundRobinResultIterator implements ResultIterator {
+
+    private static final Logger logger = LoggerFactory.getLogger(RoundRobinResultIterator.class);
+
+    private final int threshold;
+
+    private int numScannersCacheExhausted = 0;
+    private ResultIterators resultIterators;
+
+    private List<RoundRobinIterator> openIterators = new ArrayList<>();
+
+    private int index;
+    private boolean closed;
+    private final QueryPlan plan;
+
+    // For testing purposes
+    private int numParallelFetches;
+
+    public RoundRobinResultIterator(ResultIterators iterators, QueryPlan plan) {
+        this.resultIterators = iterators;
+        this.plan = plan;
+        this.threshold = getThreshold();
+    }
+
+    public RoundRobinResultIterator(List<PeekingResultIterator> iterators, QueryPlan plan) {
+        this.resultIterators = null;
+        this.plan = plan;
+        this.threshold = getThreshold();
+        initOpenIterators(wrapToRoundRobinIterators(iterators));
+    }
+
+    public static ResultIterator newIterator(final List<PeekingResultIterator> iterators, QueryPlan plan) {
+        if (iterators.isEmpty()) { return EMPTY_ITERATOR; }
+        if (iterators.size() == 1) { return iterators.get(0); }
+        return new RoundRobinResultIterator(iterators, plan);
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        List<RoundRobinIterator> iterators;
+        int size;
+        while ((size = (iterators = getIterators()).size()) > 0) {
+            index = index % size;
+            RoundRobinIterator itr = iterators.get(index);
+            if (itr.getNumRecordsRead() < threshold) {
+                Tuple tuple;
+                if ((tuple = itr.peek()) != null) {
+                    tuple = itr.next();
+                    if (itr.getNumRecordsRead() == threshold) {
+                        numScannersCacheExhausted++;
+                    }
+                    index = (index + 1) % size;
+                    return tuple;
+                } else {
+                    // The underlying scanner is exhausted. Close the iterator and un-track it.
+                    itr.close();
+                    iterators.remove(index);
+                    if (iterators.size() == 0) {
+                        close();
+                    }
+                }
+            } else {
+                index = (index + 1) % size;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void close() throws SQLException {
+        if (closed) { return; }
+        closed = true;
+        SQLException toThrow = null;
+        try {
+            if (resultIterators != null) {
+                resultIterators.close();
+            }
+        } catch (Exception e) {
+            toThrow = ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (openIterators.size() > 0) {
+                    for (RoundRobinIterator itr : openIterators) {
+                        try {
+                            itr.close();
+                        } catch (Exception e) {
+                            if (toThrow == null) {
+                                toThrow = ServerUtil.parseServerException(e);
+                            } else {
+                                toThrow.setNextException(ServerUtil.parseServerException(e));
+                            }
+                        }
+                    }
+                }
+            } finally {
+                if (toThrow != null) { throw toThrow; }
+            }
+        }
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        if (resultIterators != null) {
+            resultIterators.explain(planSteps);
+        }
+    }
+
+    @VisibleForTesting
+    int getNumberOfParallelFetches() {
+        return numParallelFetches;
+    }
+
+    @VisibleForTesting
+    QueryPlan getQueryPlan() {
+        return plan;
+    }
+
+    private List<RoundRobinIterator> getIterators() throws SQLException {
+        if (closed) { return Collections.emptyList(); }
+        if (openIterators.size() > 0 && openIterators.size() == numScannersCacheExhausted) {
+            /*
+             * All the scanners have exhausted their cache. Submit the scanners back to the pool so that they can fetch
+             * the next batch of records in parallel.
+             */
+            initOpenIterators(fetchNextBatch());
+        } else if (openIterators.size() == 0 && resultIterators != null) {
+            List<PeekingResultIterator> iterators = resultIterators.getIterators();
+            initOpenIterators(wrapToRoundRobinIterators(iterators));
+        }
+        return openIterators;
+    }
+
+    private List<RoundRobinIterator> wrapToRoundRobinIterators(List<PeekingResultIterator> iterators) {
+        List<RoundRobinIterator> roundRobinItrs = new ArrayList<>(iterators.size());
+        for (PeekingResultIterator itr : iterators) {
+            roundRobinItrs.add(new RoundRobinIterator(itr, null));
+        }
+        return roundRobinItrs;
+    }
+
+    private void initOpenIterators(List<RoundRobinIterator> iterators) {
+        openIterators.clear();
+        openIterators.addAll(iterators);
+        index = 0;
+        numScannersCacheExhausted = 0;
+    }
+
+    private int getThreshold() {
+        int cacheSize = getScannerCacheSize();
+        checkArgument(cacheSize > 1, "RoundRobinResultIterator doesn't work when cache size is less than or equal to 1");
+        return cacheSize - 1;
+    }
+
+    private int getScannerCacheSize() {
+        try {
+            return plan.getContext().getStatement().getFetchSize();
+        } catch (Throwable e) {
+            Throwables.propagate(e);
+        }
+        return -1; // unreachable
+    }
+
+    private List<RoundRobinIterator> fetchNextBatch() throws SQLException {
+        int numExpectedIterators = openIterators.size();
+        List<Future<Tuple>> futures = new ArrayList<>(numExpectedIterators);
+        List<RoundRobinIterator> results = new ArrayList<>();
+
+        // Randomize the order in which we will be hitting region servers to try not overload particular region servers.
+        Collections.shuffle(openIterators);
+        boolean success = false;
+        SQLException toThrow = null;
+        try {
+            StatementContext context = plan.getContext();
+            final ConnectionQueryServices services = context.getConnection().getQueryServices();
+            ExecutorService executor = services.getExecutor();
+            numParallelFetches++;
+            if (logger.isDebugEnabled()) {
+                logger.debug("Performing parallel fetch for " + openIterators.size() + " iterators. ");
+            }
+            for (final RoundRobinIterator itr : openIterators) {
+                Future<Tuple> future = executor.submit(new Callable<Tuple>() {
+                    @Override
+                    public Tuple call() throws Exception {
+                        // Read the next record to refill the scanner's cache.
+                        return itr.next();
+                    }
+                });
+                futures.add(future);
+            }
+            int i = 0;
+            for (Future<Tuple> future : futures) {
+                Tuple tuple = future.get();
+                if (tuple != null) {
+                    results.add(new RoundRobinIterator(openIterators.get(i), tuple));
+                } else {
+                    // Underlying scanner is exhausted. So close it.
+                    openIterators.get(i).close();
+                }
+                i++;
+            }
+            success = true;
+            return results;
+        } catch (SQLException e) {
+            toThrow = e;
+        } catch (Exception e) {
+            toThrow = ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (!success) {
+                    try {
+                        close();
+                    } catch (Exception e) {
+                        if (toThrow == null) {
+                            toThrow = ServerUtil.parseServerException(e);
+                        } else {
+                            toThrow.setNextException(ServerUtil.parseServerException(e));
+                        }
+                    }
+                }
+            } finally {
+                if (toThrow != null) {
+                    FAILED_QUERY.increment();
+                    throw toThrow;
+                }
+            }
+        }
+        return null; // Not reachable
+    }
+
+    /**
+     * Inner class that delegates to {@link PeekingResultIterator} keeping track the number of records it has read. Also
+     * keeps track of the tuple the {@link PeekingResultIterator} read in the previous next() call before it ran out of
+     * underlying scanner cache.
+     */
+    private class RoundRobinIterator implements PeekingResultIterator {
+
+        private PeekingResultIterator delegate;
+        private Tuple tuple;
+        private int numRecordsRead;
+
+        private RoundRobinIterator(PeekingResultIterator itr, Tuple tuple) {
+            this.delegate = itr;
+            this.tuple = tuple;
+            this.numRecordsRead = 0;
+        }
+
+        @Override
+        public void close() throws SQLException {
+            delegate.close();
+        }
+
+        @Override
+        public Tuple next() throws SQLException {
+            if (tuple != null) {
+                Tuple t = tuple;
+                tuple = null;
+                return t;
+            }
+            numRecordsRead++;
+            return delegate.next();
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {
+            delegate.explain(planSteps);
+        }
+
+        @Override
+        public Tuple peek() throws SQLException {
+            if (tuple != null) { return tuple; }
+            return delegate.peek();
+        }
+
+        public int getNumRecordsRead() {
+            return numRecordsRead;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 49e384c..8ee56ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -65,9 +65,10 @@ import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.SQLCloseable;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 
 /**
@@ -1255,4 +1256,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
     public <T> T getObject(String columnLabel, Class<T> type) throws SQLException {
         return (T) getObject(columnLabel); // Just ignore type since we only support built-in types
     }
+    
+    @VisibleForTesting
+    public ResultIterator getUnderlyingIterator() {
+        return scanner;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 465d84a..21b641b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -489,6 +489,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public boolean isRowKeyOrdered() {
                     return true;
                 }
+
+                @Override
+                public boolean useRoundRobinIterator() throws SQLException {
+                    return false;
+                }
             };
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 2074658..eb6dc3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -100,11 +101,11 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
         try {
             List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
             for (Scan scan : scans) {
-                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(), scan);
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }
-            ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
+            ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
             if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
                 iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
             }
@@ -117,8 +118,8 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
             Throwables.propagate(e);
         }
    }
-
-    @Override
+    
+   @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
         if (key == null) {
             key = NullWritable.get();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 23f3288..4a5b304 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -82,7 +82,10 @@ public interface QueryServices extends SQLCloseable {
     public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
     public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
     public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
+    
+    // Deprecated. Use FORCE_ROW_KEY_ORDER instead.
     public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB  = "phoenix.query.rowKeyOrderSaltedTable";
+    
     public static final String USE_INDEXES_ATTRIB  = "phoenix.query.useIndexes";
     public static final String IMMUTABLE_ROWS_ATTRIB  = "phoenix.mutate.immutableRows";
     public static final String INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB  = "phoenix.index.mutableBatchSizeThreshold";
@@ -158,6 +161,9 @@ public interface QueryServices extends SQLCloseable {
     // rpc queue configs
     public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
     public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
+    
+    public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
+    
 
     /**
      * Get executor service used for parallel scans

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3912ea5..cf62333 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB
 import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
@@ -100,7 +101,6 @@ public class QueryServicesOptions {
     public static final String DEFAULT_DATE_FORMAT_TIMEZONE = DateUtil.DEFAULT_TIME_ZONE_ID;
     public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true;
     public static final int DEFAULT_MAX_MUTATION_SIZE = 500000;
-    public static final boolean DEFAULT_ROW_KEY_ORDER_SALTED_TABLE = true; // Merge sort on client to ensure salted tables are row key ordered
     public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes
     public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated
     public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also.
@@ -191,6 +191,7 @@ public class QueryServicesOptions {
     private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
     
     public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
+    public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false;
 
     private final Configuration config;
 
@@ -229,7 +230,7 @@ public class QueryServicesOptions {
             .setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_STATS_UPDATE_FREQ_MS)
             .setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, DEFAULT_CALL_QUEUE_ROUND_ROBIN)
             .setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE)
-            .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_ROW_KEY_ORDER_SALTED_TABLE)
+            .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
             .setIfUnset(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES)
             .setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS)
             .setIfUnset(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD)
@@ -246,6 +247,7 @@ public class QueryServicesOptions {
             .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
             .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
             .setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX)
+            .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
@@ -352,10 +354,6 @@ public class QueryServicesOptions {
         return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize);
     }
 
-    public QueryServicesOptions setRowKeyOrderSaltedTable(boolean rowKeyOrderSaltedTable) {
-        return set(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, rowKeyOrderSaltedTable);
-    }
-
     public QueryServicesOptions setDropMetaData(boolean dropMetadata) {
         return set(DROP_METADATA_ATTRIB, dropMetadata);
     }
@@ -452,6 +450,10 @@ public class QueryServicesOptions {
     public boolean isUseByteBasedRegex() {
         return config.getBoolean(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX);
     }
+    
+    public int getScanCacheSize() {
+        return config.getInt(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE);
+    }
 
     public QueryServicesOptions setMaxServerCacheTTLMs(int ttl) {
         return set(MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, ttl);
@@ -531,4 +533,9 @@ public class QueryServicesOptions {
         config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag);
         return this;
     }
+    
+    public QueryServicesOptions setForceRowKeyOrder(boolean forceRowKeyOrder) {
+        config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index e414039..22208f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1696,7 +1696,7 @@ public class MetaDataClient {
                 splits = getSplitKeys(connection.getQueryServices().getAllTableRegions(parent.getPhysicalName().getBytes()));
             } else {
                 splits = SchemaUtil.processSplits(splits, pkColumns, saltBucketNum, connection.getQueryServices().getProps().getBoolean(
-                    QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE));
+                        QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER));
             }
             MetaDataMutationResult result = connection.getQueryServices().createTable(
                     tableMetaData,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 2268866..a1473ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.util;
 
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
 
 import java.io.IOException;
@@ -38,7 +40,9 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.filter.BooleanExpressionFilter;
@@ -46,6 +50,8 @@ import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.RowKeySchema;
@@ -658,4 +664,20 @@ public class ScanUtil {
         }
         return filterIterator;
     }
+    
+    public static boolean isRoundRobinPossible(OrderBy orderBy, StatementContext context) throws SQLException {
+        int fetchSize  = context.getStatement().getFetchSize();
+        /*
+         * Selecting underlying scanners in a round-robin fashion is possible if there is no ordering of rows needed,
+         * not even row key order. Also no point doing round robin of scanners if fetch size
+         * is 1.
+         */
+        return fetchSize > 1 && !shouldRowsBeInRowKeyOrder(orderBy, context) && orderBy.getOrderByExpressions().isEmpty();
+    }
+    
+    public static boolean shouldRowsBeInRowKeyOrder(OrderBy orderBy, StatementContext context) {
+        boolean forceRowKeyOrder = context.getConnection().getQueryServices().getProps()
+                .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER);
+        return forceRowKeyOrder || orderBy == FWD_ROW_KEY_ORDER_BY || orderBy == REV_ROW_KEY_ORDER_BY;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 77eb237..7be8eae 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnAlreadyExistsException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -1732,5 +1733,64 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query);
             assertFalse("Expected group by not to be order preserving: " + query, plan.getGroupBy().isOrderPreserving());
         }
-    }    
+    }
+    
+    @Test
+    public void testUseRoundRobinIterator() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE TABLE t (k1 char(2) not null, k2 varchar not null, k3 integer not null, v varchar, constraint pk primary key(k1,k2,k3))");
+        String[] queries = {
+                "SELECT 1 FROM T ",
+                "SELECT 1 FROM T WHERE V = 'c'",
+                "SELECT 1 FROM T WHERE (k1,k2, k3) > ('a', 'ab', 1)",
+                };
+        String query;
+        for (int i = 0; i < queries.length; i++) {
+            query = queries[i];
+            QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query);
+            assertTrue("Expected plan to use round robin iterator " + query, plan.useRoundRobinIterator());
+        }
+    }
+    
+    @Test
+    public void testForcingRowKeyOrderNotUseRoundRobinIterator() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(true));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE TABLE t (k1 char(2) not null, k2 varchar not null, k3 integer not null, v varchar, constraint pk primary key(k1,k2,k3))");
+        String[] queries = {
+                "SELECT 1 FROM T ",
+                "SELECT 1 FROM T WHERE V = 'c'",
+                "SELECT 1 FROM T WHERE (k1, k2, k3) > ('a', 'ab', 1)",
+                };
+        String query;
+        for (int i = 0; i < queries.length; i++) {
+            query = queries[i];
+            QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query);
+            assertFalse("Expected plan to not use round robin iterator " + query, plan.useRoundRobinIterator());
+        }
+    }
+    
+    @Test
+    public void testPlanForOrderByOrGroupByNotUseRoundRobin() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE TABLE t (k1 char(2) not null, k2 varchar not null, k3 integer not null, v varchar, constraint pk primary key(k1,k2,k3))");
+        String[] queries = {
+                "SELECT 1 FROM T ORDER BY K1",
+                "SELECT 1 FROM T WHERE V = 'c' ORDER BY K1, K2",
+                "SELECT 1 FROM T WHERE (k1,k2, k3) > ('a', 'ab', 1) ORDER BY V",
+                "SELECT 1 FROM T GROUP BY V",
+                "SELECT 1 FROM T GROUP BY K1, V, K2 ORDER BY V",
+                };
+        String query;
+        for (int i = 0; i < queries.length; i++) {
+            query = queries[i];
+            QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query);
+            assertFalse("Expected plan to not use round robin iterator " + query, plan.useRoundRobinIterator());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
index 29e14bf..e645383 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
@@ -600,7 +600,7 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest {
     public static void doSetup() throws Exception {
         Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
         // enables manual splitting on salted tables
-        props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false));
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
         initDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index f929eb4..6f2a2f1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -411,6 +411,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
             public List<List<Scan>> getScans() {
                 return null;
             }
+
+            @Override
+            public boolean useRoundRobinIterator() {
+                return false;
+            }
             
         }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
         List<KeyRange> keyRanges = parallelIterators.getSplits();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index c1b7f99..9fee78f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -59,6 +59,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
      * value overwhelms our mini clusters.
      */
     public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = 4;
+    public static final boolean DEFAULT_FORCE_ROWKEY_ORDER = true;
 
     
     public QueryServicesTestImpl(ReadOnlyProps defaultProps) {
@@ -79,7 +80,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
                 .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS)
                 .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC)
                 .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE)
-                .setRowKeyOrderSaltedTable(true)
                 .setMaxServerCacheTTLMs(DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS)
                 .setMasterInfoPort(DEFAULT_MASTER_INFO_PORT)
                 .setRegionServerInfoPort(DEFAULT_REGIONSERVER_INFO_PORT)
@@ -88,7 +88,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
                 .setWALEditCodec(DEFAULT_WAL_EDIT_CODEC)
                 .setDropMetaData(DEFAULT_DROP_METADATA)
                 .setMaxClientMetaDataCacheSize(DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE)
-                .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE);
+                .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE)
+                .setForceRowKeyOrder(DEFAULT_FORCE_ROWKEY_ORDER);
     }
     
     public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps overrideProps) {