You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/07/21 04:32:26 UTC

[phoenix] branch master updated: PHOENIX-6910 Scans created during query compilation and execution against salted tables need to be more resilient

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new bbc915437e PHOENIX-6910 Scans created during query compilation and execution against salted tables need to be more resilient
bbc915437e is described below

commit bbc915437e026fc415c20fa06136044b257ce919
Author: Istvan Toth <st...@apache.org>
AuthorDate: Mon May 15 07:33:04 2023 +0200

    PHOENIX-6910 Scans created during query compilation and execution against salted tables need to be more resilient
    
    Co-authored-by: Karthik Palanisamy <kp...@cloudera.com>
    Co-authored-by: Jacob Isaac <ja...@gmail.com>
---
 .../phoenix/end2end/ParallelStatsDisabledIT.java   |   6 -
 .../end2end/salted/SaltedTableMergeBucketsIT.java  | 487 +++++++++++++++++++++
 .../org/apache/phoenix/compile/ScanRanges.java     |  65 ++-
 .../org/apache/phoenix/compile/WhereOptimizer.java |   1 +
 .../phoenix/iterate/BaseResultIterators.java       | 106 ++---
 .../iterate/DefaultParallelScanGrouper.java        |  64 +--
 .../iterate/MapReduceParallelScanGrouper.java      |   6 +-
 .../phoenix/iterate/ParallelScanGrouper.java       |  20 +-
 .../phoenix/iterate/ParallelScansCollector.java    |  57 +++
 .../java/org/apache/phoenix/util/SchemaUtil.java   |   8 +-
 .../java/org/apache/phoenix/query/BaseTest.java    |   7 +
 .../java/org/apache/phoenix/util/TestUtil.java     |   3 +-
 12 files changed, 722 insertions(+), 108 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
index 782faf8389..3cd9dc7202 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
@@ -67,12 +67,6 @@ public abstract class ParallelStatsDisabledIT extends BaseTest {
         BaseTest.freeResourcesIfBeyondThreshold();
     }
 
-    public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
-        PreparedStatement statement = conn.prepareStatement(queryBuilder.build());
-        ResultSet rs = statement.executeQuery();
-        return rs;
-    }
-
     public static ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
             String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
         ResultSet rs = null;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableMergeBucketsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableMergeBucketsIT.java
new file mode 100644
index 0000000000..01fc614836
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableMergeBucketsIT.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.salted;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.assertResultSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
+import org.apache.phoenix.end2end.ParallelStatsEnabledTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryBuilder;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(ParallelStatsEnabledTest.class)
+public class SaltedTableMergeBucketsIT extends ParallelStatsEnabledIT {
+
+    @Test
+    public void testWithVariousPKTypes() throws Exception {
+
+        int[] variousSaltBuckets = new int[] { 11, 23, 31 };
+        SortOrder[][] sortOrdersCase1 =
+                new SortOrder[][] { { SortOrder.ASC, SortOrder.ASC, SortOrder.ASC },
+                        { SortOrder.ASC, SortOrder.ASC, SortOrder.DESC },
+                        { SortOrder.ASC, SortOrder.DESC, SortOrder.ASC },
+                        { SortOrder.ASC, SortOrder.DESC, SortOrder.DESC },
+                        { SortOrder.DESC, SortOrder.ASC, SortOrder.ASC },
+                        { SortOrder.DESC, SortOrder.ASC, SortOrder.DESC },
+                        { SortOrder.DESC, SortOrder.DESC, SortOrder.ASC },
+                        { SortOrder.DESC, SortOrder.DESC, SortOrder.DESC } };
+
+        SortOrder[][] sortOrdersCase2 =
+                new SortOrder[][] { { SortOrder.ASC, SortOrder.ASC, SortOrder.ASC, SortOrder.ASC },
+                        { SortOrder.ASC, SortOrder.ASC, SortOrder.ASC, SortOrder.DESC },
+                        { SortOrder.ASC, SortOrder.ASC, SortOrder.DESC, SortOrder.ASC },
+                        { SortOrder.ASC, SortOrder.ASC, SortOrder.DESC, SortOrder.DESC },
+                        { SortOrder.ASC, SortOrder.DESC, SortOrder.DESC, SortOrder.ASC },
+                        { SortOrder.ASC, SortOrder.DESC, SortOrder.DESC, SortOrder.DESC },
+                        { SortOrder.ASC, SortOrder.DESC, SortOrder.ASC, SortOrder.ASC },
+                        { SortOrder.ASC, SortOrder.DESC, SortOrder.ASC, SortOrder.DESC },
+                        { SortOrder.DESC, SortOrder.ASC, SortOrder.ASC, SortOrder.ASC },
+                        { SortOrder.DESC, SortOrder.ASC, SortOrder.ASC, SortOrder.DESC },
+                        { SortOrder.DESC, SortOrder.ASC, SortOrder.DESC, SortOrder.ASC },
+                        { SortOrder.DESC, SortOrder.ASC, SortOrder.DESC, SortOrder.DESC },
+                        { SortOrder.DESC, SortOrder.DESC, SortOrder.ASC, SortOrder.ASC },
+                        { SortOrder.DESC, SortOrder.DESC, SortOrder.ASC, SortOrder.DESC },
+                        { SortOrder.DESC, SortOrder.DESC, SortOrder.DESC, SortOrder.ASC },
+                        { SortOrder.DESC, SortOrder.DESC, SortOrder.DESC, SortOrder.DESC } };
+
+        for (int index = 0; index < sortOrdersCase1.length; index++) {
+            // Test Case 1: PK1 = Bigint, PK2 = Decimal, PK3 = Bigint
+            String baseTableNameCase1 = generateUniqueName();
+            PDataType[] testIntDecIntPKTypes =
+                    new PDataType[] { PLong.INSTANCE, PDecimal.INSTANCE, PLong.INSTANCE };
+            long nowTime1 =
+                    createTableCase1(baseTableNameCase1, variousSaltBuckets[0],
+                        testIntDecIntPKTypes[0], sortOrdersCase1[index][0], testIntDecIntPKTypes[1],
+                        sortOrdersCase1[index][1], testIntDecIntPKTypes[2],
+                        sortOrdersCase1[index][2]);
+            testIntDecIntPK(baseTableNameCase1, nowTime1, sortOrdersCase1[index]);
+            mergeRegions(baseTableNameCase1);
+            testIntDecIntPK(baseTableNameCase1, nowTime1, sortOrdersCase1[index]);
+
+            // Test Case 2: PK1 = Integer, PK2 = Integer, PK3 = Integer, PK4 = Integer
+            String baseTableNameCase2 = generateUniqueName();
+            PDataType[] testIntIntIntIntPKTypes =
+                    new PDataType[] { PInteger.INSTANCE, PInteger.INSTANCE, PInteger.INSTANCE,
+                            PInteger.INSTANCE };
+            createTableCase2(baseTableNameCase2, variousSaltBuckets[0], testIntIntIntIntPKTypes[0],
+                sortOrdersCase2[index][0], testIntIntIntIntPKTypes[1], sortOrdersCase2[index][1],
+                testIntIntIntIntPKTypes[2], sortOrdersCase2[index][2], testIntIntIntIntPKTypes[3],
+                sortOrdersCase2[index][3]);
+            doTestGroupByOrderMatchPkColumnOrderBug4690(baseTableNameCase2);
+            mergeRegions(baseTableNameCase2);
+            doTestGroupByOrderMatchPkColumnOrderBug4690(baseTableNameCase2);
+        }
+    }
+
+    private void testIntDecIntPK(String tableName, long nowTime, SortOrder[] sortOrder)
+            throws SQLException {
+        String testName = "testIntDecIntPK";
+        String testSQL1 =
+                String.format(
+                    "SELECT ROW_ID FROM %s WHERE (ID1, ID2, ID3) IN ((%d, 21, 1),(%d, 2, 31))",
+                    tableName, nowTime, nowTime);
+        String testSQL2 =
+                String.format("SELECT ROW_ID FROM %s WHERE (ID2, ID3) IN ((21.0, 1),(2.0, 3))",
+                    tableName);
+        String testSQL3 =
+                String.format("SELECT ROW_ID FROM %s WHERE (ID1, ID2) IN ((%d, 21.0),(%d, 2.0))",
+                    tableName, nowTime + 1, nowTime + 1);
+        String testSQL4 =
+                String.format(
+                    "SELECT ROW_ID FROM %s WHERE (ID3, ID2, ID1) IN ((3, 21.0, %d),(3, 2.0, %d))",
+                    tableName, nowTime + 1, nowTime + 1);
+        String testSQL5 =
+                String.format(
+                    "SELECT ROW_ID FROM %s WHERE (ID1, ID2, ID3) IN ((%d, 21.0, 3),(%d, 2.0, 3))",
+                    tableName, nowTime + 1, nowTime + 1);
+        String testSQL6 =
+                String.format("SELECT ROW_ID FROM %s WHERE ID1 = %d AND ID2 = 2.0", tableName,
+                    nowTime);
+        String testSQL7 =
+                String.format("SELECT ROW_ID FROM %s WHERE ID1 >= %d AND ID1 < %d", tableName,
+                    nowTime, nowTime + 3);
+        String testSQL8 =
+                String.format(
+                    "SELECT ROW_ID FROM %s WHERE (ID1 = %d OR ID1 = %d OR ID1 = %d) AND (ID3 = %d)",
+                    tableName, nowTime, nowTime + 3, nowTime + 2, 5);
+
+        Set<String> expecteds1 = Collections.<String> emptySet();
+        Set<String> expecteds2 = Sets.newHashSet(new String[] { "row0", "row1" });
+        Set<String> expecteds3 = Sets.newHashSet(new String[] { "row1", "row2" });
+        Set<String> expecteds4 = Sets.newHashSet(new String[] { "row1" });
+        Set<String> expecteds5 = Sets.newHashSet(new String[] { "row1" });
+        Set<String> expecteds6 = Sets.newHashSet(new String[] { "row0" });
+        Set<String> expecteds7 = Sets.newHashSet(new String[] { "row0", "row1", "row2", "row3" });
+        Set<String> expecteds8 = Sets.newHashSet(new String[] { "row3" });
+
+        assertExpectedWithWhere(testName, testSQL1, expecteds1, expecteds1.size());
+        assertExpectedWithWhere(testName, testSQL2, expecteds2, expecteds2.size());
+        assertExpectedWithWhere(testName, testSQL3, expecteds3, expecteds3.size());
+        assertExpectedWithWhere(testName, testSQL4, expecteds4, expecteds4.size());
+        assertExpectedWithWhere(testName, testSQL5, expecteds5, expecteds5.size());
+        assertExpectedWithWhere(testName, testSQL6, expecteds6, expecteds6.size());
+        assertExpectedWithWhere(testName, testSQL7, expecteds7, expecteds7.size());
+        assertExpectedWithWhere(testName, testSQL8, expecteds8, expecteds8.size());
+    }
+
+    private void assertExpectedWithWhere(String testType, String testSQL, Set<String> expectedSet,
+            int expectedCount) throws SQLException {
+        String context = "sql: " + testSQL + ", type: " + testType;
+
+        try (Connection tenantConnection = DriverManager.getConnection(getUrl())) {
+            // perform the query
+            ResultSet rs = tenantConnection.createStatement().executeQuery(testSQL);
+            for (int i = 0; i < expectedCount; i++) {
+                assertTrue(
+                    "did not include result '" + expectedSet.toString() + "' (" + context + ")",
+                    rs.next());
+                String actual = rs.getString(1);
+                assertTrue(context, expectedSet.contains(actual));
+            }
+            assertFalse(context, rs.next());
+        }
+    }
+
+    private void doTestGroupByOrderMatchPkColumnOrderBug4690(String tableName) throws Exception {
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            QueryBuilder queryBuilder =
+                    new QueryBuilder().setSelectExpression("PK2,PK1,COUNT(V)")
+                            .setSelectExpressionColumns(Lists.newArrayList("PK1", "PK2", "V"))
+                            .setFullTableName(tableName).setGroupByClause("PK2, PK1")
+                            .setOrderByClause("PK2, PK1");
+            ResultSet rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 2, 3, 2L }, { 3, 2, 1L }, { 7, 2, 2L },
+                    { 8, 1, 2L }, { 9, 1, 4L } });
+
+            queryBuilder.setSelectExpression("PK1, PK2, COUNT(V)");
+            queryBuilder.setOrderByClause("PK1, PK2");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 1, 8, 2L }, { 1, 9, 4L }, { 2, 3, 1L },
+                    { 2, 7, 2L }, { 3, 2, 2L } });
+
+            queryBuilder.setSelectExpression("PK2,PK1,COUNT(V)");
+            queryBuilder.setOrderByClause("PK2 DESC,PK1 DESC");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 9, 1, 4L }, { 8, 1, 2L }, { 7, 2, 2L },
+                    { 3, 2, 1L }, { 2, 3, 2L } });
+
+            queryBuilder.setSelectExpression("PK1,PK2,COUNT(V)");
+            queryBuilder.setOrderByClause("PK1 DESC,PK2 DESC");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 3, 2, 2L }, { 2, 7, 2L }, { 2, 3, 1L },
+                    { 1, 9, 4L }, { 1, 8, 2L } });
+
+            queryBuilder.setSelectExpression("PK3,PK2,COUNT(V)");
+            queryBuilder.setSelectExpressionColumns(Lists.newArrayList("PK1", "PK2", "PK3", "V"));
+            queryBuilder.setFullTableName(tableName);
+            queryBuilder.setGroupByClause("PK3,PK2");
+            queryBuilder.setOrderByClause("PK3,PK2");
+            queryBuilder.setWhereClause("PK1=1");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 5, 9, 1L }, { 6, 9, 2L }, { 7, 9, 1L },
+                    { 10, 8, 1L }, { 11, 8, 1L } });
+
+            queryBuilder.setSelectExpression("PK2,PK3,COUNT(V)");
+            queryBuilder.setOrderByClause("PK2,PK3");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 8, 10, 1L }, { 8, 11, 1L }, { 9, 5, 1L },
+                    { 9, 6, 2L }, { 9, 7, 1L } });
+
+            queryBuilder.setSelectExpression("PK3,PK2,COUNT(V)");
+            queryBuilder.setOrderByClause("PK3 DESC,PK2 DESC");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 11, 8, 1L }, { 10, 8, 1L }, { 7, 9, 1L },
+                    { 6, 9, 2L }, { 5, 9, 1L } });
+
+            queryBuilder.setSelectExpression("PK2,PK3,COUNT(V)");
+            queryBuilder.setOrderByClause("PK2 DESC,PK3 DESC");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 9, 7, 1L }, { 9, 6, 2L }, { 9, 5, 1L },
+                    { 8, 11, 1L }, { 8, 10, 1L } });
+
+            queryBuilder.setSelectExpression("PK4,PK3,PK1,COUNT(V)");
+            queryBuilder.setSelectExpressionColumns(
+                Lists.newArrayList("PK1", "PK2", "PK3", "PK4", "V"));
+            queryBuilder.setFullTableName(tableName);
+            queryBuilder.setWhereClause("PK2=9 ");
+            queryBuilder.setGroupByClause("PK4,PK3,PK1");
+            queryBuilder.setOrderByClause("PK4,PK3,PK1");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 8, 7, 1, 1L }, { 12, 6, 1, 1L },
+                    { 13, 6, 1, 1L }, { 22, 5, 1, 1L } });
+
+            queryBuilder.setSelectExpression("PK1,PK3,PK4,COUNT(V)");
+            queryBuilder.setOrderByClause("PK1,PK3,PK4");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 1, 5, 22, 1L }, { 1, 6, 12, 1L },
+                    { 1, 6, 13, 1L }, { 1, 7, 8, 1L } });
+
+            queryBuilder.setSelectExpression("PK4,PK3,PK1,COUNT(V)");
+            queryBuilder.setOrderByClause("PK4 DESC,PK3 DESC,PK1 DESC");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 22, 5, 1, 1L }, { 13, 6, 1, 1L },
+                    { 12, 6, 1, 1L }, { 8, 7, 1, 1L } });
+
+            queryBuilder.setSelectExpression("PK1,PK3,PK4,COUNT(V)");
+            queryBuilder.setOrderByClause("PK1 DESC,PK3 DESC,PK4 DESC");
+            rs = executeQuery(conn, queryBuilder);
+            assertResultSet(rs, new Object[][] { { 1, 7, 8, 1L }, { 1, 6, 13, 1L },
+                    { 1, 6, 12, 1L }, { 1, 5, 22, 1L } });
+        }
+    }
+
+    public void mergeRegions(String testTableName) throws Exception {
+        Admin admin =
+                driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+        List<RegionInfo> regions = admin.getRegions(TableName.valueOf(testTableName));
+
+        for (int i = 0; i < regions.size() - 1; i += 2) {
+            byte[][] regionsToMerge = new byte[2][];
+            regionsToMerge[0] = regions.get(i).getEncodedNameAsBytes();
+            regionsToMerge[1] = regions.get(i + 1).getEncodedNameAsBytes();
+            admin.mergeRegionsAsync(regionsToMerge, false).get();
+        }
+    }
+
+    private long createTableCase1(String baseTable, int saltBuckets, PDataType pkType1,
+            SortOrder pk1Order, PDataType pkType2, SortOrder pk2Order, PDataType pkType3,
+            SortOrder pk3Order) throws SQLException {
+
+        String pkType1Str = getType(pkType1);
+        String pkType2Str = getType(pkType2);
+        String pkType3Str = getType(pkType3);
+
+        try (Connection tenantConnection = DriverManager.getConnection(getUrl())) {
+            try (Statement cstmt = tenantConnection.createStatement()) {
+                String TABLE_TEMPLATE =
+                        "CREATE TABLE IF NOT EXISTS %s(ID1 %s not null,ID2 %s not null, "
+                        + "ID3 %s not null, ROW_ID VARCHAR, V VARCHAR CONSTRAINT pk "
+                        + " PRIMARY KEY (ID1 %s, ID2 %s, ID3 %s)) SALT_BUCKETS=%d ";
+                cstmt.execute(String.format(TABLE_TEMPLATE, baseTable, pkType1Str, pkType2Str,
+                    pkType3Str, pk1Order.name(), pk2Order.name(), pk3Order.name(), saltBuckets));
+            }
+        }
+
+        long nowTime = System.currentTimeMillis();
+        List<String> UPSERT_SQLS = new ArrayList<>();
+        UPSERT_SQLS.addAll(Arrays.asList(new String[] {
+                String.format(
+                    "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+                    baseTable, nowTime, 2.0, 3, "row0", "v1"),
+                String.format(
+                    "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+                    baseTable, nowTime + 1, 2.0, 3, "row1", "v2"),
+                String.format(
+                    "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+                    baseTable, nowTime + 1, 2.0, 5, "row2", "v3"),
+                String.format(
+                    "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+                    baseTable, nowTime + 2, 4.0, 5, "row3", "v4"),
+                String.format(
+                    "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+                    baseTable, nowTime + 3, 6.0, 7, "row4", "v5") }));
+
+        for (int i = 5; i < 100; i++) {
+            UPSERT_SQLS.add(String.format(
+                "UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, V) VALUES (%d, %f, %d, '%s', '%s')",
+                baseTable, nowTime + i, 10.0, 13, "row" + i, "v" + i));
+        }
+
+        try (Connection tenantConnection = DriverManager.getConnection(getUrl())) {
+            tenantConnection.setAutoCommit(true);
+            try (Statement ustmt = tenantConnection.createStatement()) {
+                for (String upsertSql : UPSERT_SQLS) {
+                    ustmt.execute(upsertSql);
+                }
+            }
+        }
+
+        return nowTime;
+    }
+
+    private void createTableCase2(String baseTable, int saltBuckets, PDataType pkType1,
+            SortOrder pk1Order, PDataType pkType2, SortOrder pk2Order, PDataType pkType3,
+            SortOrder pk3Order, PDataType pkType4, SortOrder pk4Order) throws SQLException {
+
+        String pkType1Str = getType(pkType1);
+        String pkType2Str = getType(pkType2);
+        String pkType3Str = getType(pkType3);
+        String pkType4Str = getType(pkType4);
+
+        try (Connection globalConnection = DriverManager.getConnection(getUrl())) {
+            try (Statement cstmt = globalConnection.createStatement()) {
+                String TABLE_TEMPLATE =
+                        "CREATE TABLE IF NOT EXISTS %s(PK1 %s not null,PK2 %s not null, "
+                        + " PK3 %s not null,PK4 %s not null, V INTEGER CONSTRAINT "
+                        + " pk PRIMARY KEY (PK1 %s, PK2 %s, PK3 %s, PK4 %s)) SALT_BUCKETS=%d ";
+                cstmt.execute(String.format(TABLE_TEMPLATE, baseTable, pkType1Str, pkType2Str,
+                    pkType3Str, pkType4Str, pk1Order.name(), pk2Order.name(), pk3Order.name(),
+                    pk4Order.name(), saltBuckets));
+            }
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,8,10,20,30)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,8,11,21,31)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,9,5 ,22,32)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,9,6 ,12,33)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,9,6 ,13,34)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (1,9,7 ,8,35)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (2,3,15,25,35)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (2,7,16,26,36)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (2,7,17,27,37)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (3,2,18,28,38)");
+            conn.createStatement().execute("UPSERT INTO " + baseTable + " VALUES (3,2,19,29,39)");
+            conn.commit();
+        }
+    }
+
+    private String getType(PDataType pkType) {
+        String pkTypeStr = "VARCHAR(25)";
+        switch (pkType.getSqlType()) {
+        case Types.VARCHAR:
+            pkTypeStr = "VARCHAR(25)";
+            break;
+        case Types.CHAR:
+            pkTypeStr = "CHAR(15)";
+            break;
+        case Types.DECIMAL:
+            pkTypeStr = "DECIMAL(8,2)";
+            break;
+        case Types.INTEGER:
+            pkTypeStr = "INTEGER";
+            break;
+        case Types.BIGINT:
+            pkTypeStr = "BIGINT";
+            break;
+        case Types.DATE:
+            pkTypeStr = "DATE";
+            break;
+        case Types.TIMESTAMP:
+            pkTypeStr = "TIMESTAMP";
+            break;
+        default:
+            pkTypeStr = "VARCHAR(25)";
+        }
+        return pkTypeStr;
+    }
+
+    @Test
+    public void testMergesWithWideGuidepostsAndWithStatsForParallelization() throws Exception {
+        testMerges(true, true);
+    }
+
+    @Test
+    public void testMergesWithWideGuidepostsAndWithoutStatsForParallelization() throws Exception {
+        testMerges(true, false);
+    }
+
+    @Test
+    public void testMergesWithoutWideGuidepostsAndWithStatsForParallelization() throws Exception {
+        testMerges(false, true);
+    }
+
+    @Test
+    public void testMergesWithoutWideGuidepostsAndWithoutStatsForParallelization()
+            throws Exception {
+        testMerges(false, false);
+    }
+
+    public void testMerges(boolean withWideGuideposts, boolean withStatsForParallelization)
+            throws Exception {
+        String tableName = generateUniqueName();
+
+        List<String> range =
+                IntStream.range(0, 100).boxed().map(s -> String.format("201912%03d", s))
+                        .collect(Collectors.toList());
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.FALSE.toString());
+        if (!withStatsForParallelization) {
+            props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.FALSE.toString());
+        }
+        try (Connection connection = DriverManager.getConnection(getUrl(), props);
+                Statement statement = connection.createStatement();) {
+            statement.execute("CREATE TABLE " + tableName
+                    + " (c1 VARCHAR NOT NULL, c2 VARCHAR NOT NULL, c3 VARCHAR NOT NULL, v1 VARCHAR "
+                    + " CONSTRAINT pk PRIMARY KEY(c1,c2,c3)) SALT_BUCKETS=11");
+            for (String c1 : range) {
+                statement.execute(" upsert into " + tableName + " values('" + c1
+                        + "','HORTONWORKS_WEEKLY_TEST','v3','" + c1 + "')");
+            }
+            connection.commit();
+
+            if (withWideGuideposts) {
+                // This is way bigger than the regions, guaranteeing no guideposts inside the
+                // original regions
+                statement.execute(
+                    "UPDATE STATISTICS " + tableName + " SET GUIDE_POSTS_WIDTH = 1000000");
+            } else {
+                //The default 20 bytes for these tests
+                statement.execute(
+                    "UPDATE STATISTICS " + tableName);
+            }
+
+            mergeRegions(tableName);
+
+            for (String c1 : range) {
+                ResultSet rs =
+                        statement.executeQuery("select c1, c2, c3, v1 from " + tableName
+                                + " where c1='" + c1 + "' and c2 like '%HORTONWORKS_WEEKLY_TEST%'");
+                assertTrue(rs.next());
+                assertEquals(c1, rs.getString("c1"));
+                assertEquals(c1, rs.getString("v1"));
+                assertFalse(rs.next());
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 1502315f50..94c3f87840 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -245,7 +245,70 @@ public class ScanRanges {
         System.arraycopy(key, keyOffset, temp, 0, key.length - keyOffset);
         return temp;
     }
-    
+
+    // This variant adds synthetic scan boundaries at potentially missing salt bucket boundaries
+    // and won't return null Scans
+    public List<Scan> intersectScan(Scan scan, final byte[] originalStartKey,
+            final byte[] originalStopKey, final int keyOffset, byte[] splitPostfix, Integer buckets,
+            boolean crossesRegionBoundary) {
+        // FIXME Both the salted status and the pre-computed bucket list should be available in
+        // this object, but in some cases they get overwritten, so we cannot use that.
+        List<Scan> newScans = new ArrayList<Scan>();
+        if (buckets != null && buckets > 0) {
+            byte[] wrkStartKey = originalStartKey;
+            do {
+                boolean lastBucket = false;
+                byte[] nextBucketStart = null;
+                byte[] nextBucketByte = null;
+                if (wrkStartKey.length > 0 && Byte.toUnsignedInt(wrkStartKey[0]) >= buckets - 1) {
+                    lastBucket = true;
+                } else {
+                    // This includes the zero bytes from the minimum PK
+                    nextBucketStart = bucketEnd(wrkStartKey, splitPostfix);
+                    // These is the start of the next bucket in byte[], without the PK suffix
+                    nextBucketByte = new byte[] { nextBucketStart[0] };
+                }
+                if (lastBucket || Bytes.compareTo(originalStopKey, nextBucketStart) <= 0) {
+                    // either we don't need to add synthetic guideposts, or we already have, and
+                    // are at the last bucket of the original scan
+                    addIfNotNull(newScans, intersectScan(scan, wrkStartKey, originalStopKey,
+                        keyOffset, crossesRegionBoundary));
+                    break;
+                }
+                // This is where we add the synthetic guidepost.
+                // We skip [nextBucketByte, nextBucketStart), but it's guaranteed that there are no
+                // rows there.
+                addIfNotNull(newScans,
+                    intersectScan(scan, wrkStartKey, nextBucketByte, keyOffset, false));
+                wrkStartKey = nextBucketStart;
+            } while (true);
+        } else {
+            // Definitely Not crossing buckets
+            addIfNotNull(newScans, intersectScan(scan, originalStartKey, originalStopKey, keyOffset,
+                crossesRegionBoundary));
+        }
+        return newScans;
+    }
+
+    private void addIfNotNull(List<Scan> scans, Scan newScan) {
+        if (newScan != null) {
+            scans.add(newScan);
+        }
+    }
+
+    // The split (presplit for salted tables) code extends the split point to the minimum PK length.
+    // Adding the same postfix here avoids creating and extra [n,n\x00\x00\x00..\x00) scan for each
+    // bucket
+    private byte[] bucketEnd(byte[] key, byte[] splitPostfix) {
+        byte startByte = key.length > 0 ? key[0] : 0;
+        int nextBucket = Byte.toUnsignedInt(startByte) + 1;
+        byte[] bucketEnd = new byte[splitPostfix.length + 1];
+        bucketEnd[0] = (byte) nextBucket;
+        System.arraycopy(splitPostfix, 0, bucketEnd, 1, splitPostfix.length);
+        return bucketEnd;
+    }
+
+    //TODO split this for normal, salted and local index variants
     public Scan intersectScan(Scan scan, final byte[] originalStartKey, final byte[] originalStopKey, final int keyOffset, boolean crossesRegionBoundary) {
         byte[] startKey = originalStartKey;
         byte[] stopKey = originalStopKey;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 94b1972c9d..21011c24b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -155,6 +155,7 @@ public class WhereOptimizer {
             keySlots = whereClause.accept(visitor);
 
             if (keySlots == null && (tenantId == null || !table.isMultiTenant()) && table.getViewIndexId() == null && !minOffset.isPresent()) {
+                // FIXME this overwrites salting info in the scanRange
                 context.setScanRanges(ScanRanges.EVERYTHING);
                 return whereClause;
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 2c6885bbf5..cecab8626f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -39,6 +39,7 @@ import java.io.DataInputStream;
 import java.io.EOFException;
 import java.sql.SQLException;
 import java.util.*;
+import java.util.Arrays;
 import java.util.Map.Entry;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -671,24 +672,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 Math.min(estimate.lastUpdated,
                     gps.getGuidePostTimestamps()[guideIndex]);
     }
-    
-    private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan,
-            byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation) {
-        boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary);
-        if (scan != null) {
-            if (regionLocation.getServerName() != null) {
-                scan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER, regionLocation.getServerName().getVersionedBytes());
-            }
-            if (useStatsForParallelization || crossedRegionBoundary) {
-                scans.add(scan);
-            }
-        }
-        if (startNewScan && !scans.isEmpty()) {
-            parallelScans.add(scans);
-            scans = Lists.newArrayListWithExpectedSize(1);
-        }
-        return scans;
-    }
 
     private List<List<Scan>> getParallelScans() throws SQLException {
         // If the scan boundaries are not matching with scan in context that means we need to get
@@ -718,18 +701,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         if (scan.getStopRow().length > 0) {
             stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), scan.getStopRow()));
         }
-        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
-        List<Scan> scans = Lists.newArrayListWithExpectedSize(2);
+        ParallelScansCollector parallelScans = new ParallelScansCollector(scanGrouper);
         while (regionIndex <= stopIndex) {
             HRegionLocation regionLocation = regionLocations.get(regionIndex);
             RegionInfo regionInfo = regionLocation.getRegion();
             Scan newScan = ScanUtil.newScan(scan);
-            byte[] endKey;
-            if (regionIndex == stopIndex) {
-                endKey = scan.getStopRow();
-            } else {
-                endKey = regionBoundaries.get(regionIndex);
-            }
             if (ScanUtil.isLocalIndex(scan)) {
                 ScanUtil.setLocalIndexAttributes(newScan, 0, regionInfo.getStartKey(),
                     regionInfo.getEndKey(), newScan.getAttribute(SCAN_START_ROW_SUFFIX),
@@ -744,13 +720,14 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     newScan.withStopRow(regionInfo.getEndKey());
                 }
             }
-            scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
+            if (regionLocation.getServerName() != null) {
+                newScan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER,
+                    regionLocation.getServerName().getVersionedBytes());
+            }
+            parallelScans.addNewScan(plan, newScan, true);
             regionIndex++;
         }
-        if (!scans.isEmpty()) { // Add any remaining scans
-            parallelScans.add(scans);
-        }
-        return parallelScans;
+        return parallelScans.getParallelScans();
     }
 
     private static class GuidePostEstimate {
@@ -924,7 +901,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         // return true if we've clipped the rowKey
         return maxOffset != offset;
     }
-    
+
     /**
      * Compute the list of parallel scans to run for a given query. The inner scans
      * may be concatenated together directly, while the other ones may need to be
@@ -962,6 +939,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     Long.MAX_VALUE, false);
             return parallelScans;
         }
+        byte[] sampleProcessedSaltByte =
+                SchemaUtil.processSplit(new byte[] { 0 }, table.getPKColumns());
+        byte[] splitPostfix =
+                Arrays.copyOfRange(sampleProcessedSaltByte, 1, sampleProcessedSaltByte.length);
         List<HRegionLocation> regionLocations = getRegionBoundaries(scanGrouper);
         List<byte[]> regionBoundaries = toBoundaries(regionLocations);
         boolean isSalted = table.getBucketNum() != null;
@@ -1014,15 +995,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 stopKey = regionLocations.get(stopIndex).getRegion().getEndKey();
             }
         }
-        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+        ParallelScansCollector parallelScanCollector = new ParallelScansCollector(scanGrouper);
         
         ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
         
         int gpsSize = gps.getGuidePostsCount();
-        int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
         int keyOffset = 0;
         ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
-        List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
         ImmutableBytesWritable guidePosts = gps.getGuidePosts();
         ByteArrayInputStream stream = null;
         DataInput input = null;
@@ -1118,13 +1097,28 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 byte[] initialKeyBytes = currentKeyBytes;
                 int gpsComparedToEndKey = -1;
                 boolean everNotDelayed = false;
-                while (intersectWithGuidePosts && (endKey.length == 0 || (gpsComparedToEndKey=currentGuidePost.compareTo(endKey)) <= 0)) {
-                    Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
-                        false);
-                    if (newScan != null) {
-                        ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
-                            regionInfo.getStartKey(), regionInfo.getEndKey(),
-                            newScan.getStartRow(), newScan.getStopRow());
+                while (intersectWithGuidePosts && (endKey.length == 0
+                        || (gpsComparedToEndKey = currentGuidePost.compareTo(endKey)) <= 0)) {
+                    List<Scan> newScans =
+                            scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes,
+                                keyOffset, splitPostfix, getTable().getBucketNum(),
+                                gpsComparedToEndKey == 0);
+                    if (useStatsForParallelization) {
+                        for (int newScanIdx = 0; newScanIdx < newScans.size(); newScanIdx++) {
+                            Scan newScan = newScans.get(newScanIdx);
+                            ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
+                                regionInfo.getStartKey(), regionInfo.getEndKey(),
+                                newScan.getStartRow(), newScan.getStopRow());
+                            if (regionLocation.getServerName() != null) {
+                                newScan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER,
+                                    regionLocation.getServerName().getVersionedBytes());
+                            }
+                            boolean lastOfNew = newScanIdx == newScans.size() - 1;
+                            parallelScanCollector.addNewScan(plan, newScan,
+                                gpsComparedToEndKey == 0 && lastOfNew);
+                        }
+                    }
+                    if (newScans.size() > 0) {
                         // If we've delaying adding estimates, add the previous
                         // gp estimates now that we know they are in range.
                         if (delayAddingEst) {
@@ -1139,7 +1133,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                         delayAddingEst = false;
                     }
                     everNotDelayed |= !delayAddingEst;
-                    scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation);
                     currentKeyBytes = currentGuidePostBytes;
                     try {
                         currentGuidePost = PrefixByteCodec.decode(decoder, input);
@@ -1158,10 +1151,21 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                      */
                     currentKeyBytes = initialKeyBytes;
                 }
-                Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true);
-                if (newScan != null) {
+                List<Scan> newScans =
+                        scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset,
+                            splitPostfix, getTable().getBucketNum(), true);
+                for (int newScanIdx = 0; newScanIdx < newScans.size(); newScanIdx++) {
+                    Scan newScan = newScans.get(newScanIdx);
                     ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
                         regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
+                    if (regionLocation.getServerName() != null) {
+                        newScan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER,
+                            regionLocation.getServerName().getVersionedBytes());
+                    }
+                    boolean lastOfNew = newScanIdx == newScans.size() - 1;
+                    parallelScanCollector.addNewScan(plan, newScan, lastOfNew);
+                }
+                if (newScans.size() > 0) {
                     // Boundary case of no GP in region after delaying adding of estimates
                     if (!gpsInThisRegion && delayAddingEst) {
                         updateEstimates(gps, guideIndex-1, estimates);
@@ -1171,7 +1175,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 } else if (!gpsInThisRegion) {
                     delayAddingEst = false;
                 }
-                scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
                 currentKeyBytes = endKey;
                 // We have a guide post in the region if the above loop was entered
                 // or if the current key is less than the region end key (since the loop
@@ -1183,7 +1186,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     ( regionIndex == startRegionIndex && gpsForFirstRegion ) || // GP in first region (before start key)
                     ( gpsAfterStopKey = ( regionIndex == stopIndex && intersectWithGuidePosts && // GP in last region (after stop key)
                             ( endRegionKey.length == 0 || // then check if gp is in the region
-                            currentGuidePost.compareTo(endRegionKey) < 0)  ) );            
+                            currentGuidePost.compareTo(endRegionKey) < 0)));
                 if (gpsAfterStopKey) {
                     // If gp after stop key, but still in last region, track min ts as fallback 
                     fallbackTs =
@@ -1192,16 +1195,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 }
                 regionIndex++;
             }
-            if (!scans.isEmpty()) { // Add any remaining scans
-                parallelScans.add(scans);
-            }
-            generateEstimates(scanRanges, table, gps, emptyGuidePost, parallelScans, estimates,
+            generateEstimates(scanRanges, table, gps, emptyGuidePost, parallelScanCollector.getParallelScans(), estimates,
                     fallbackTs, gpsAvailableForAllRegions);
         } finally {
             if (stream != null) Closeables.closeQuietly(stream);
         }
-        sampleScans(parallelScans,this.plan.getStatement().getTableSamplingRate());
-        return parallelScans;
+        sampleScans(parallelScanCollector.getParallelScans(),this.plan.getStatement().getTableSamplingRate());
+        return parallelScanCollector.getParallelScans();
     }
 
     private void generateEstimates(ScanRanges scanRanges, PTable table, GuidePostsInfo gps,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
index 646c0bb972..2064619b1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.List;
 
-import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.QueryPlan;
@@ -28,7 +27,6 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ScanUtil;
 
 /**
@@ -37,38 +35,42 @@ import org.apache.phoenix.util.ScanUtil;
  */
 public class DefaultParallelScanGrouper implements ParallelScanGrouper {
 
-  private static DefaultParallelScanGrouper INSTANCE = new DefaultParallelScanGrouper();
+    private static DefaultParallelScanGrouper INSTANCE = new DefaultParallelScanGrouper();
 
+    public DefaultParallelScanGrouper() {
+    }
 
-  public DefaultParallelScanGrouper() {
-  }
-
-  public static DefaultParallelScanGrouper getInstance() {
-    return INSTANCE;
-  }
-
+    public static DefaultParallelScanGrouper getInstance() {
+        return INSTANCE;
+    }
 
-  @Override
-  public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary) {
-    PTable table = plan.getTableRef().getTable();
-    boolean startNewScanGroup = false;
-    if (!plan.isRowKeyOrdered()) {
-      startNewScanGroup = true;
-    } else if (crossedRegionBoundary) {
-      if (table.getIndexType() == IndexType.LOCAL) {
-        startNewScanGroup = true;
-      } else if (table.getBucketNum() != null) {
-        startNewScanGroup = scans.isEmpty() ||
-            ScanUtil.crossesPrefixBoundary(startKey,
-                ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES),
-                SaltingUtil.NUM_SALTING_BYTES);
-      }
+    /**
+     * Returns true if the scan with the startKey is to be the first of a new batch
+     */
+    @Override
+    public boolean shouldStartNewScan(QueryPlan plan, Scan lastScan, byte[] startKey,
+            boolean crossesRegionBoundary) {
+        PTable table = plan.getTableRef().getTable();
+        if (lastScan == null) {
+            return false;
+        } else if (!plan.isRowKeyOrdered()) {
+            return true;
+        } else if (crossesRegionBoundary && table.getIndexType() == IndexType.LOCAL) {
+            return true;
+        } else if (table.getBucketNum() != null ) {
+            return crossesRegionBoundary
+                    || ScanUtil.crossesPrefixBoundary(startKey,
+                        ScanUtil.getPrefix(lastScan.getStartRow(),
+                            SaltingUtil.NUM_SALTING_BYTES),
+                        SaltingUtil.NUM_SALTING_BYTES);
+        } else {
+            return false;
+        }
     }
-    return startNewScanGroup;
-  }
 
-  @Override
-  public List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName) throws SQLException{
-    return context.getConnection().getQueryServices().getAllTableRegions(tableName);
-  }
+    @Override
+    public List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName)
+            throws SQLException {
+        return context.getConnection().getQueryServices().getAllTableRegions(tableName);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
index 1af96c37c6..088432ae53 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -56,9 +56,9 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
     MapReduceParallelScanGrouper() {}
 
 	@Override
-	public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans,
-			byte[] startKey, boolean crossedRegionBoundary) {
-		return !plan.isRowKeyOrdered() || crossedRegionBoundary;
+	public boolean shouldStartNewScan(QueryPlan plan, Scan lastScan,
+			byte[] startKey, boolean crossesRegionBoundary) {
+		return (!plan.isRowKeyOrdered() || crossesRegionBoundary) && lastScan != null;
 	}
 
 	@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
index 5e237a5662..e53bc0a3b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
@@ -30,16 +30,16 @@ import org.apache.phoenix.compile.StatementContext;
  */
 public interface ParallelScanGrouper {
 
-	/**
-	 * Determines whether to create a new group of parallel scans.
-	 *
-	 * @param scans						current scan group
-	 * @param plan						current query plan
-	 * @param startKey					start key of scan
-	 * @param crossedRegionBoundary		whether we crossed a region boundary
-	 * @return true if we should create a new group of scans
-	 */
-	boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary);
+    /**
+     * Determines whether to create a new group of parallel scans.
+     * @param plan current query plan
+     * @param lastScan the last scan processed
+     * @param startKey of the new scan
+     * @param crossesRegionBoundary whether startKey in a different region than lastScan
+     * @return true if we should create a new group of scans, starting with the scan whose start
+     *         key we passed as startKey
+     */
+	boolean shouldStartNewScan(QueryPlan plan, Scan lastScan, byte[] startKey, boolean crossesRegionBoundary);
 
 	List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName) throws SQLException;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java
new file mode 100644
index 0000000000..e1c99e2836
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+
+/**
+ * Stores some state to help build the parallel Scans structure
+ */
+public class ParallelScansCollector {
+
+    private final ParallelScanGrouper grouper;
+    private boolean lastScanCrossedRegionBoundary = false;
+    private final List<List<Scan>> parallelScans = new ArrayList<>();
+    private List<Scan> lastBatch = new ArrayList<>();
+    private Scan lastScan = null;
+
+    public ParallelScansCollector(ParallelScanGrouper grouper) {
+        this.grouper = grouper;
+        parallelScans.add(lastBatch);
+    }
+
+    public void addNewScan(QueryPlan plan, Scan newScan, boolean crossesRegionBoundary) {
+        if (grouper.shouldStartNewScan(plan, lastScan, newScan.getStartRow(),
+            lastScanCrossedRegionBoundary)) {
+            lastBatch = new ArrayList<>();
+            parallelScans.add(lastBatch);
+        }
+        lastBatch.add(newScan);
+
+        lastScanCrossedRegionBoundary = crossesRegionBoundary;
+        lastScan = newScan;
+    }
+
+    public List<List<Scan>> getParallelScans() {
+        return parallelScans;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 8272eab92e..fbca85b3b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -38,6 +38,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -597,7 +598,8 @@ public class SchemaUtil {
     }
     
     // Given the splits and the rowKeySchema, find out the keys that 
-    public static byte[][] processSplits(byte[][] splits, LinkedHashSet<PColumn> pkColumns, Integer saltBucketNum, boolean defaultRowKeyOrder) throws SQLException {
+    public static byte[][] processSplits(byte[][] splits, Collection<PColumn> pkColumns,
+            Integer saltBucketNum, boolean defaultRowKeyOrder) throws SQLException {
         // FIXME: shouldn't this return if splits.length == 0?
         if (splits == null) return null;
         // We do not accept user specified splits if the table is salted and we specify defaultRowKeyOrder. In this case,
@@ -611,6 +613,8 @@ public class SchemaUtil {
         }
         byte[][] newSplits = new byte[splits.length][];
         for (int i=0; i<splits.length; i++) {
+            // Salted tables don't need this processing, but the Split policy uses it for all
+            // tables, so it makes sense to apply to those to be consistent
             newSplits[i] = processSplit(splits[i], pkColumns); 
         }
         return newSplits;
@@ -618,7 +622,7 @@ public class SchemaUtil {
 
     // Go through each slot in the schema and try match it with the split byte array. If the split
     // does not confer to the schema, extends its length to match the schema.
-    private static byte[] processSplit(byte[] split, LinkedHashSet<PColumn> pkColumns) {
+    public static byte[] processSplit(byte[] split, Collection<PColumn> pkColumns) {
         int pos = 0, offset = 0, maxOffset = split.length;
         Iterator<PColumn> iterator = pkColumns.iterator();
         while (pos < pkColumns.size()) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index fd8f0903b0..4c8a1e2ade 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -160,6 +160,7 @@ import org.apache.phoenix.util.ConfigUtil;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryBuilder;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -748,6 +749,12 @@ public abstract class BaseTest {
         createTestTable(url, ddl, splits, ts);
     }
 
+    public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+        PreparedStatement statement = conn.prepareStatement(queryBuilder.build());
+        ResultSet rs = statement.executeQuery();
+        return rs;
+    }
+
     private static AtomicInteger NAME_SUFFIX = new AtomicInteger(0);
     private static final int MAX_SUFFIX_VALUE = 1000000;
 
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 60e0a108bb..fe0d8d1b2c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1274,8 +1274,7 @@ public class TestUtil {
                 if (realValue == null) {
                     assertNull("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "]", expectedValue);
                 } else {
-                    assertEquals("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "],realValue:[" +
-                            realValue + "],expectedValue:[" + expectedValue + "]",
+                    assertEquals("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "]",
                         expectedValue,
                         realValue
                     );