You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/04 05:50:58 UTC

[1/6] PHOENIX-1251 Salted queries with range scan become full table scans

Repository: phoenix
Updated Branches:
  refs/heads/3.0 d3e6a9fa2 -> 80e218c24


http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
new file mode 100644
index 0000000..fd22e47
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class QueryPlanTest extends BaseConnectionlessQueryTest {
+    
+    @Test
+    public void testExplainPlan() throws Exception {
+        String[] queryPlans = new String[] {
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000001','000000000000005') ",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000005'] - ['000000000000001','000000000000008']",
+
+                "SELECT host FROM PTSDB3 WHERE host IN ('na1', 'na2','na3')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 KEYS OVER PTSDB3 [~'na3'] - [~'na1']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY",
+
+                "SELECT host FROM PTSDB WHERE inst IS NULL AND host IS NOT NULL AND date >= to_date('2013-01-01')",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER PTSDB [null,not null]\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY AND DATE >= '2013-01-01 00:00:00.000'",
+
+                // Since inst IS NOT NULL is unbounded, we won't continue optimizing
+                "SELECT host FROM PTSDB WHERE inst IS NOT NULL AND host IS NULL AND date >= to_date('2013-01-01')",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER PTSDB [not null]\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY AND (HOST IS NULL AND DATE >= '2013-01-01 00:00:00.000')",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id = '000000000000002' AND x_integer = 2 AND a_integer < 5 ",
+                "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER ATABLE\n" + 
+                "    SERVER FILTER BY (X_INTEGER = 2 AND A_INTEGER < 5)",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) <= ('000000000000001','000000000000005') ",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000003'] - ['000000000000001','000000000000006']",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id > '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000003','000000000000005') ",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000003','000000000000005'] - [*]\n" + 
+                "    SERVER FILTER BY (ENTITY_ID > '000000000000002' AND ENTITY_ID < '000000000000008')",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id >= '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000000','000000000000005') ",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000002'] - ['000000000000001','000000000000008']",
+
+                "SELECT * FROM atable",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE",
+
+                "SELECT inst,host FROM PTSDB WHERE inst IN ('na1', 'na2','na3') AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 6 RANGES OVER PTSDB ['na1','a','2013-01-01'] - ['na3','b','2013-01-02']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY",
+
+                "SELECT inst,host FROM PTSDB WHERE inst LIKE 'na%' AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 RANGES OVER PTSDB ['na','a','2013-01-01'] - ['nb','b','2013-01-02']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY",
+
+                "SELECT count(*) FROM atable",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
+                "    SERVER AGGREGATE INTO SINGLE ROW",
+
+                "SELECT count(*) FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003            '] - ['000000000000001','004            ']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
+                "    SERVER AGGREGATE INTO SINGLE ROW",
+
+                "SELECT a_string FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003            '] - ['000000000000001','004            ']",
+
+                "SELECT count(1) FROM atable GROUP BY a_string",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" +
+                "CLIENT MERGE SORT",
+
+                "SELECT count(1) FROM atable GROUP BY a_string LIMIT 5",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + 
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" + 
+                "CLIENT MERGE SORT\n" + 
+                "CLIENT 5 ROW LIMIT",
+
+                "SELECT a_string FROM atable ORDER BY a_string DESC LIMIT 3",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" + 
+                "    SERVER TOP 3 ROWS SORTED BY [A_STRING DESC]\n" + 
+                "CLIENT MERGE SORT",
+
+                "SELECT count(1) FROM atable GROUP BY a_string,b_string HAVING max(a_string) = 'a'",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT FILTER BY MAX(A_STRING) = 'a'",
+
+                "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY ROUND(a_time,'HOUR',2),entity_id HAVING max(a_string) = 'a'",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER FILTER BY A_INTEGER = 1\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [ENTITY_ID, ROUND(A_TIME)]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT FILTER BY MAX(A_STRING) = 'a'",
+
+                "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY a_string,b_string HAVING max(a_string) = 'a' ORDER BY b_string",
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE\n" +
+                "    SERVER FILTER BY A_INTEGER = 1\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT FILTER BY MAX(A_STRING) = 'a'\n" +
+                "CLIENT SORTED BY [B_STRING]",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id != '000000000000002' AND x_integer = 2 AND a_integer < 5 LIMIT 10",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
+                "    SERVER FILTER BY (ENTITY_ID != '000000000000002' AND X_INTEGER = 2 AND A_INTEGER < 5)\n" + 
+                "    SERVER 10 ROW LIMIT\n" + 
+                "CLIENT 10 ROW LIMIT",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string ASC NULLS FIRST LIMIT 10",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
+                "    SERVER TOP 10 ROWS SORTED BY [A_STRING]\n" + 
+                "CLIENT MERGE SORT",
+
+                "SELECT max(a_integer) FROM atable WHERE organization_id = '000000000000001' GROUP BY organization_id,entity_id,ROUND(a_date,'HOUR') ORDER BY entity_id NULLS LAST LIMIT 10",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [ORGANIZATION_ID, ENTITY_ID, ROUND(A_DATE)]\n" + 
+                "CLIENT MERGE SORT\n" + 
+                "CLIENT TOP 10 ROWS SORTED BY [ENTITY_ID NULLS LAST]",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string DESC NULLS LAST LIMIT 10",
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
+                "    SERVER TOP 10 ROWS SORTED BY [A_STRING DESC NULLS LAST]\n" + 
+                "CLIENT MERGE SORT",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id IN ('000000000000001', '000000000000005')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 KEYS OVER ATABLE ['000000000000001'] - ['000000000000005']",
+
+                "SELECT a_string,b_string FROM atable WHERE organization_id IN ('00D000000000001', '00D000000000005') AND entity_id IN('00E00000000000X','00E00000000000Z')",
+                "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 4 KEYS OVER ATABLE",
+
+                "SELECT inst,host FROM PTSDB WHERE REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1', 'na2','na3')",
+                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 RANGES OVER PTSDB ['na1'] - ['na4']\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY AND REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1','na2','na3')",
+
+        };
+        for (int i = 0; i < queryPlans.length; i+=2) {
+            String query = queryPlans[i];
+            String plan = queryPlans[i+1];
+            Properties props = new Properties();
+            // Override date format so we don't have a bunch of zeros
+            props.setProperty(QueryServices.DATE_FORMAT_ATTRIB, "yyyy-MM-dd");
+            Connection conn = DriverManager.getConnection(getUrl(), props);
+            try {
+                Statement statement = conn.createStatement();
+                ResultSet rs = statement.executeQuery("EXPLAIN " + query);
+                // TODO: figure out a way of verifying that query isn't run during explain execution
+                assertEquals((i/2+1) + ") " + query, plan, QueryUtil.getExplainPlan(rs));
+            } finally {
+                conn.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 89bbb6d..67fa531 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -149,6 +149,7 @@ public class TestUtil {
     public static final String ATABLE_SCHEMA_NAME = "";
     public static final String BTABLE_NAME = "BTABLE";
     public static final String STABLE_NAME = "STABLE";
+    public static final String STABLE_PK_NAME = "ID";
     public static final String STABLE_SCHEMA_NAME = "";
     public static final String GROUPBYTEST_NAME = "GROUPBYTEST";
     public static final String CUSTOM_ENTITY_DATA_FULL_NAME = "CORE.CUSTOM_ENTITY_DATA";
@@ -441,4 +442,44 @@ public class TestUtil {
             upsertRow(conn, "DESC", i, inputList.get(i));
         }
     }
+    
+    public static List<KeyRange> getAllSplits(Connection conn, String tableName) throws SQLException {
+        return getSplits(conn, tableName, null, null, null, null);
+    }
+    
+    public static List<KeyRange> getAllSplits(Connection conn, String tableName, String where) throws SQLException {
+        return getSplits(conn, tableName, null, null, null, where);
+    }
+    
+    public static List<KeyRange> getSplits(Connection conn, String tableName, String pkCol, byte[] lowerRange, byte[] upperRange, String whereClauseSuffix) throws SQLException {
+        String whereClauseStart = 
+                (lowerRange == null && upperRange == null ? "" : 
+                    " WHERE " + ((lowerRange != null ? (pkCol + " >= ? " + (upperRange != null ? " AND " : "")) : "") 
+                              + (upperRange != null ? (pkCol + " < ?") : "" )));
+        String whereClause = whereClauseSuffix == null ? whereClauseStart : whereClauseStart.length() == 0 ? (" WHERE " + whereClauseSuffix) : (" AND " + whereClauseSuffix);
+        String query = "SELECT COUNT(*) FROM " + tableName + whereClause;
+        PhoenixPreparedStatement pstmt = conn.prepareStatement(query).unwrap(PhoenixPreparedStatement.class);
+        if (lowerRange != null) {
+            pstmt.setBytes(1, lowerRange);
+        }
+        if (upperRange != null) {
+            pstmt.setBytes(lowerRange != null ? 2 : 1, upperRange);
+        }
+        pstmt.execute();
+        List<KeyRange> keyRanges = pstmt.getQueryPlan().getSplits();
+        return keyRanges;
+    }
+    
+    public static List<KeyRange> getSplits(Connection conn, byte[] lowerRange, byte[] upperRange) throws SQLException {
+        return getSplits(conn, STABLE_NAME, STABLE_PK_NAME, lowerRange, upperRange, null);
+    }
+
+    public static List<KeyRange> getAllSplits(Connection conn) throws SQLException {
+        return getAllSplits(conn, STABLE_NAME);
+    }
+
+    public static void analyzeTable(Connection conn, String tableName) throws IOException, SQLException {
+        String query = "ANALYZE " + tableName;
+        conn.createStatement().execute(query);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
index 4326876..bd1df97 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,13 +35,9 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.pig.PhoenixPigConfiguration;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -91,22 +86,8 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR
         Preconditions.checkNotNull(qplan);
         Preconditions.checkNotNull(splits);
         final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
-        final StatementContext context = qplan.getContext();
-        final TableRef tableRef = qplan.getTableRef();
-        for (KeyRange split : splits) {
-            final Scan splitScan = new Scan(context.getScan());
-            if (tableRef.getTable().getBucketNum() != null) {
-                KeyRange minMaxRange = context.getMinMaxRange();
-                if (minMaxRange != null) {
-                    minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
-                    split = split.intersect(minMaxRange);
-                }
-            }
-            // as the intersect code sets the actual start and stop row within the passed splitScan, we are fetching it back below.
-            if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), context.getScanRanges().useSkipScanFilter())) {
-                final PhoenixInputSplit inputSplit = new PhoenixInputSplit(KeyRange.getKeyRange(splitScan.getStartRow(), splitScan.getStopRow()));
-                psplits.add(inputSplit);     
-            }
+        for (KeyRange split : qplan.getSplits()) {
+            psplits.add(new PhoenixInputSplit(split));
         }
         return psplits;
     }
@@ -147,6 +128,8 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR
                 final Statement statement = connection.createStatement();
                 final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
                 this.queryPlan = pstmt.compileQuery(selectStatement);
+                // FIXME: why is getting the iterator necessary here, as it will
+                // cause the query to run.
                 this.queryPlan.iterator();
             } catch(Exception exception) {
                 LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));


[3/6] PHOENIX-1251 Salted queries with range scan become full table scans

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
index 8f9e553..4b20979 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
@@ -18,34 +18,22 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.analyzeTable;
+import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
@@ -153,9 +141,8 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             assertTrue("Expected 1 row in result set", rs.next());
             assertEquals(2, rs.getInt(3));
             assertEquals("Viva Las Vegas", rs.getString(4));
-            conn1 = nextConnection(getUrl());
-            List<KeyRange> splits = getSplits(conn1, new Scan());
-            assertEquals(splits.size(), 5);
+            List<KeyRange> splits = getAllSplits(conn1, TENANT_TABLE_NAME);
+            assertEquals(3, splits.size());
         }
         finally {
             conn1.close();
@@ -490,10 +477,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
         }
     }
     
-    private void analyzeTable(Connection conn, String tableName) throws IOException, SQLException {
-        String query = "ANALYZE " + tableName;
-        conn.createStatement().execute(query);
-    }
     @Test
     public void testUpsertValuesUsingViewWithNoWhereClause() throws Exception {
         Connection conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -508,34 +491,4 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
         assertFalse(rs.next());
         conn.close();
     }
-    private static List<KeyRange> getSplits(Connection conn, final Scan scan) throws SQLException {
-        TableRef tableRef = getTableRef(conn);
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(
-                tableRef.getTable().getPhysicalName().getBytes());
-        PhoenixStatement statement = new PhoenixStatement(pconn);
-        StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement));
-        DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(),
-                HintNode.EMPTY_HINT_NODE) {
-            @Override
-            protected List<HRegionLocation> getAllRegions() throws SQLException {
-                return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(),
-                        scan.getStopRow());
-            }
-        };
-        List<KeyRange> keyRanges = splitter.getSplits();
-        Collections.sort(keyRanges, new Comparator<KeyRange>() {
-            @Override
-            public int compare(KeyRange o1, KeyRange o2) {
-                return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange());
-            }
-        });
-        return keyRanges;
-    }
-    protected static TableRef getTableRef(Connection conn) throws SQLException {
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable(
-                new PTableKey(pconn.getTenantId(), PARENT_TABLE_NAME)), System.currentTimeMillis(), false);
-        return table;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
index d5e9d42..8f7912a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
@@ -145,7 +145,7 @@ public class SaltedIndexIT extends BaseIndexIT {
         rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         expectedPlan = indexSaltBuckets == null ? 
              "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [~'y']" : 
-            ("CLIENT PARALLEL 4-WAY SKIP SCAN ON 4 KEYS OVER " + INDEX_TABLE_FULL_NAME + " [0,~'y'] - [3,~'y']\n" + 
+            ("CLIENT PARALLEL 4-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [0,~'y']\n" + 
              "CLIENT MERGE SORT");
         assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs));
 
@@ -164,7 +164,7 @@ public class SaltedIndexIT extends BaseIndexIT {
         rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         expectedPlan = indexSaltBuckets == null ? 
             "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [*] - [~'x']" :
-            ("CLIENT PARALLEL 4-WAY SKIP SCAN ON 4 RANGES OVER " + INDEX_TABLE_FULL_NAME + " [0,*] - [3,~'x']\n" + 
+            ("CLIENT PARALLEL 4-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [0,*] - [0,~'x']\n" + 
              "CLIENT MERGE SORT");
         assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs));
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index a6ee92e..267aec9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -40,8 +40,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.collect.ImmutableSet;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
@@ -51,11 +49,14 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 
+import com.google.common.collect.ImmutableSet;
+
 /**
  * 
  * Client for sending cache to each region server
@@ -145,15 +146,18 @@ public class ServerCacheClient {
         ExecutorService executor = services.getExecutor();
         List<Future<Boolean>> futures = Collections.emptyList();
         try {
-            List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
+            PTable cacheUsingTable = cacheUsingTableRef.getTable();
+            List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes());
             int nRegions = locations.size();
             // Size these based on worst case
             futures = new ArrayList<Future<Boolean>>(nRegions);
             Set<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions);
             for (HRegionLocation entry : locations) {
                 // Keep track of servers we've sent to and only send once
+                byte[] regionStartKey = entry.getRegionInfo().getStartKey();
+                byte[] regionEndKey = entry.getRegionInfo().getEndKey();
                 if ( ! servers.contains(entry) && 
-                        keyRanges.intersect(entry.getRegionInfo().getStartKey(), entry.getRegionInfo().getEndKey())) {  // Call RPC once per server
+                        keyRanges.intersects(regionStartKey, regionEndKey, 0) ) {
                     servers.add(entry);
                     if (LOG.isDebugEnabled()) {LOG.debug("Adding cache entry to be sent for " + entry);}
                     final byte[] key = entry.getRegionInfo().getStartKey();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index b5c14f0..271ba30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -64,4 +64,6 @@ public interface QueryPlan extends StatementPlan {
     FilterableStatement getStatement();
 
     public boolean isDegenerate();
+    
+    public boolean isRowKeyOrdered();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index dc8e0b3..1bd8cef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -23,12 +23,16 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -40,28 +44,40 @@ import com.google.common.collect.Lists;
 public class ScanRanges {
     private static final List<List<KeyRange>> EVERYTHING_RANGES = Collections.<List<KeyRange>>emptyList();
     private static final List<List<KeyRange>> NOTHING_RANGES = Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE));
-    public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, false, false);
-    public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, false, false);
+    public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, KeyRange.EVERYTHING_RANGE, KeyRange.EVERYTHING_RANGE, false, false, null);
+    public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, KeyRange.EMPTY_RANGE, KeyRange.EMPTY_RANGE, false, false, null);
+    private static final Scan HAS_INTERSECTION = new Scan();
 
     public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan) {
-        return create(schema, ranges, slotSpan, false, null);
+        return create(schema, ranges, slotSpan, KeyRange.EVERYTHING_RANGE, false, null);
     }
     
-    public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, boolean forceRangeScan, Integer nBuckets) {
-        int offset = nBuckets == null ? 0 : 1;
-        if (ranges.size() == offset) {
+    public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, KeyRange minMaxRange, boolean forceRangeScan, Integer nBuckets) {
+        int offset = nBuckets == null ? 0 : SaltingUtil.NUM_SALTING_BYTES;
+        if (ranges.size() == offset && minMaxRange == KeyRange.EVERYTHING_RANGE) {
             return EVERYTHING;
-        } else if (ranges.size() == 1 + offset && ranges.get(offset).size() == 1 && ranges.get(offset).get(0) == KeyRange.EMPTY_RANGE) {
+        } else if (minMaxRange == KeyRange.EMPTY_RANGE || (ranges.size() == 1 + offset && ranges.get(offset).size() == 1 && ranges.get(offset).get(0) == KeyRange.EMPTY_RANGE)) {
             return NOTHING;
         }
         boolean isPointLookup = !forceRangeScan && ScanRanges.isPointLookup(schema, ranges, slotSpan);
         if (isPointLookup) {
-            // TODO: consider keeping original to use for serialization as it would
-            // be smaller?
+            // TODO: consider keeping original to use for serialization as it would be smaller?
             List<byte[]> keys = ScanRanges.getPointKeys(ranges, slotSpan, schema, nBuckets);
             List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size());
+            KeyRange unsaltedMinMaxRange = minMaxRange;
+            if (nBuckets != null && minMaxRange != KeyRange.EVERYTHING_RANGE) {
+                unsaltedMinMaxRange = KeyRange.getKeyRange(
+                        stripPrefix(minMaxRange.getLowerRange(),offset),
+                        minMaxRange.lowerUnbound(), 
+                        stripPrefix(minMaxRange.getUpperRange(),offset), 
+                        minMaxRange.upperUnbound());
+            }
             for (byte[] key : keys) {
-                keyRanges.add(KeyRange.getKeyRange(key));
+                // Filter now based on unsalted minMaxRange and ignore the point key salt byte
+                if ( unsaltedMinMaxRange.compareLowerToUpperBound(key, offset, key.length-offset, true) <= 0 &&
+                     unsaltedMinMaxRange.compareUpperToLowerBound(key, offset, key.length-offset, true) >= 0) {
+                    keyRanges.add(KeyRange.getKeyRange(key));
+                }
             }
             ranges = Collections.singletonList(keyRanges);
             if (keys.size() > 1) {
@@ -72,39 +88,286 @@ public class ScanRanges {
                 // when there's a single key.
                 slotSpan = new int[] {schema.getMaxFields()-1};
             }
-        } else if (nBuckets != null) {
-            List<List<KeyRange>> saltedRanges = Lists.newArrayListWithExpectedSize(ranges.size());
-            saltedRanges.add(SaltingUtil.generateAllSaltingRanges(nBuckets));
-            saltedRanges.addAll(ranges.subList(1, ranges.size()));
-            ranges = saltedRanges;
         }
-        return new ScanRanges(schema, slotSpan, ranges, forceRangeScan, isPointLookup);
+        List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size());
+        for (int i = 0; i < ranges.size(); i++) {
+            List<KeyRange> sorted = Lists.newArrayList(ranges.get(i));
+            Collections.sort(sorted, KeyRange.COMPARATOR);
+            sortedRanges.add(ImmutableList.copyOf(sorted));
+        }
+        boolean useSkipScanFilter = useSkipScanFilter(forceRangeScan, isPointLookup, sortedRanges);
+        
+        // Don't set minMaxRange for point lookup because it causes issues during intersect
+        // by going across region boundaries
+        KeyRange scanRange = KeyRange.EVERYTHING_RANGE;
+        // if (!isPointLookup && (nBuckets == null || !useSkipScanFilter)) {
+        // if (! ( isPointLookup || (nBuckets != null && useSkipScanFilter) ) ) {
+        // if (nBuckets == null || (nBuckets != null && (!isPointLookup || !useSkipScanFilter))) {
+        if (nBuckets == null || !isPointLookup || !useSkipScanFilter) {
+            byte[] minKey = ScanUtil.getMinKey(schema, sortedRanges, slotSpan);
+            byte[] maxKey = ScanUtil.getMaxKey(schema, sortedRanges, slotSpan);
+            // If the maxKey has crossed the salt byte boundary, then we do not
+            // have anything to filter at the upper end of the range
+            if (ScanUtil.crossesPrefixBoundary(maxKey, ScanUtil.getPrefix(minKey, offset), offset)) {
+                maxKey = KeyRange.UNBOUND;
+            }
+            // We won't filter anything at the low end of the range if we just have the salt byte
+            if (minKey.length <= offset) {
+                minKey = KeyRange.UNBOUND;
+            }
+            scanRange = KeyRange.getKeyRange(minKey, maxKey);
+        }
+        if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
+            minMaxRange = ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new ImmutableBytesWritable());
+            scanRange = scanRange.intersect(minMaxRange);
+        }
+        
+        if (scanRange == KeyRange.EMPTY_RANGE) {
+            return NOTHING;
+        }
+        return new ScanRanges(schema, slotSpan, sortedRanges, scanRange, minMaxRange, useSkipScanFilter, isPointLookup, nBuckets);
     }
 
     private SkipScanFilter filter;
     private final List<List<KeyRange>> ranges;
     private final int[] slotSpan;
     private final RowKeySchema schema;
-    private final boolean forceRangeScan;
     private final boolean isPointLookup;
+    private final boolean isSalted;
+    private final boolean useSkipScanFilter;
+    private final KeyRange scanRange;
+    private final KeyRange minMaxRange;
 
-    private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, boolean forceRangeScan, boolean isPointLookup) {
+    private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, KeyRange scanRange, KeyRange minMaxRange, boolean useSkipScanFilter, boolean isPointLookup, Integer bucketNum) {
         this.isPointLookup = isPointLookup;
-        List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size());
-        for (int i = 0; i < ranges.size(); i++) {
-            List<KeyRange> sorted = Lists.newArrayList(ranges.get(i));
-            Collections.sort(sorted, KeyRange.COMPARATOR);
-            sortedRanges.add(ImmutableList.copyOf(sorted));
+        this.isSalted = bucketNum != null;
+        this.useSkipScanFilter = useSkipScanFilter;
+        this.scanRange = scanRange;
+        this.minMaxRange = minMaxRange;
+        
+        // Only blow out the bucket values if we're using the skip scan. We need all the
+        // bucket values in this case because we use intersect against a key that may have
+        // any of the possible bucket values. Otherwise, we can pretty easily ignore the
+        // bucket values.
+        if (useSkipScanFilter && isSalted && !isPointLookup) {
+        	ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum));
         }
-        this.ranges = ImmutableList.copyOf(sortedRanges);
+        this.ranges = ImmutableList.copyOf(ranges);
         this.slotSpan = slotSpan;
         this.schema = schema;
-        if (schema != null && !ranges.isEmpty()) {
+        if (schema != null && !ranges.isEmpty()) { // TODO: only create if useSkipScanFilter is true?
             this.filter = new SkipScanFilter(this.ranges, slotSpan, schema);
         }
-        this.forceRangeScan = forceRangeScan;
+    }
+    
+    /**
+     * Get the minMaxRange that is applied in addition to the scan range.
+     * Only used by the ExplainTable to generate the explain plan.
+     */
+    public KeyRange getMinMaxRange() {
+        return minMaxRange;
+    }
+    
+    public void initializeScan(Scan scan) {
+        scan.setStartRow(scanRange.getLowerRange());
+        scan.setStopRow(scanRange.getUpperRange());
+    }
+    
+    private static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) {
+        if (key.length > 0) {
+            byte[] newKey = new byte[key.length + prefixKeyOffset];
+            int totalKeyOffset = keyOffset + prefixKeyOffset;
+            if (prefixKey.length >= totalKeyOffset) { // otherwise it's null padded
+                System.arraycopy(prefixKey, 0, newKey, 0, totalKeyOffset);
+            }
+            System.arraycopy(key, keyOffset, newKey, totalKeyOffset, key.length - keyOffset);
+            return newKey;
+        }
+        return key;
+    }
+    
+    private static byte[] replaceSaltByte(byte[] key, byte[] saltKey) {
+        if (key.length == 0) {
+            return key;
+        }
+        byte[] temp = new byte[key.length];
+        if (saltKey.length >= SaltingUtil.NUM_SALTING_BYTES) { // Otherwise it's null padded
+            System.arraycopy(saltKey, 0, temp, 0, SaltingUtil.NUM_SALTING_BYTES);
+        }
+        System.arraycopy(key, SaltingUtil.NUM_SALTING_BYTES, temp, SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES);
+        return temp;
+    }
+    
+    private static byte[] stripPrefix(byte[] key, int keyOffset) {
+        if (key.length == 0) {
+            return key;
+        }
+        byte[] temp = new byte[key.length - keyOffset];
+        System.arraycopy(key, keyOffset, temp, 0, key.length - keyOffset);
+        return temp;
+    }
+    
+    public Scan intersectScan(Scan scan, final byte[] originalStartKey, final byte[] originalStopKey, final int keyOffset) {
+        byte[] startKey = originalStartKey;
+        byte[] stopKey = originalStopKey;
+        boolean mayHaveRows = false;
+        // Keep the keys as they are if we have a point lookup, as we've already resolved the
+        // salt bytes in that case.
+        final int scanKeyOffset = this.isSalted && !this.isPointLookup ? SaltingUtil.NUM_SALTING_BYTES : 0;
+        assert (scanKeyOffset == 0 || keyOffset == 0);
+        // Offset for startKey/stopKey. Either 1 for salted tables or the prefix length
+        // of the current region for local indexes.
+        final int totalKeyOffset = scanKeyOffset + keyOffset;
+        // In this case, we've crossed the "prefix" boundary and should consider everything after the startKey
+        // This prevents us from having to prefix the key prior to knowing whether or not there may be an
+        // intersection.
+        byte[] prefixBytes = ByteUtil.EMPTY_BYTE_ARRAY;
+        if (totalKeyOffset > 0) {
+            prefixBytes = ScanUtil.getPrefix(startKey, totalKeyOffset);
+            if (ScanUtil.crossesPrefixBoundary(stopKey, prefixBytes, totalKeyOffset)) {
+                stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+            }
+        }
+        int scanStartKeyOffset = scanKeyOffset;
+        byte[] scanStartKey = scan == null ? ByteUtil.EMPTY_BYTE_ARRAY : scan.getStartRow();
+        // Compare ignoring key prefix and salt byte
+        if (scanStartKey.length > 0) {
+            if (startKey.length > 0 && Bytes.compareTo(scanStartKey, scanKeyOffset, scanStartKey.length - scanKeyOffset, startKey, totalKeyOffset, startKey.length - totalKeyOffset) < 0) {
+                scanStartKey = startKey;
+                scanStartKeyOffset = totalKeyOffset;
+            }
+        } else {
+        	scanStartKey = startKey;
+            scanStartKeyOffset = totalKeyOffset;
+            mayHaveRows = true;
+        }
+        int scanStopKeyOffset = scanKeyOffset;
+        byte[] scanStopKey = scan == null ? ByteUtil.EMPTY_BYTE_ARRAY : scan.getStopRow();
+        if (scanStopKey.length > 0) {
+            if (stopKey.length > 0 && Bytes.compareTo(scanStopKey, scanKeyOffset, scanStopKey.length - scanKeyOffset, stopKey, totalKeyOffset, stopKey.length - totalKeyOffset) > 0) {
+                scanStopKey = stopKey;
+                scanStopKeyOffset = totalKeyOffset;
+            }
+        } else {
+        	scanStopKey = stopKey;
+            scanStopKeyOffset = totalKeyOffset;
+            mayHaveRows = true;
+        }
+        mayHaveRows = mayHaveRows || Bytes.compareTo(scanStartKey, scanStartKeyOffset, scanStartKey.length - scanStartKeyOffset, scanStopKey, scanStopKeyOffset, scanStopKey.length - scanStopKeyOffset) < 0;
+        
+        if (!mayHaveRows) {
+            return null;
+        }
+        if (originalStopKey.length != 0 && scanStopKey.length == 0) {
+            scanStopKey = originalStopKey;
+        }
+        Filter newFilter = null;
+        // If the scan is using skip scan filter, intersect and replace the filter.
+        if (scan == null || this.useSkipScanFilter()) {
+            byte[] skipScanStartKey = scanStartKey;
+            byte[] skipScanStopKey = scanStopKey;
+            // If we have a keyOffset and we've used the startKey/stopKey that
+            // were passed in (which have the prefix) for the above range check,
+            // we need to remove the prefix before running our intersect method.
+            // TODO: we could use skipScanFilter.setOffset(keyOffset) if both
+            // the startKey and stopKey were used above *and* our intersect
+            // method honored the skipScanFilter.offset variable.
+            if (scanKeyOffset > 0) {
+                if (skipScanStartKey != originalStartKey) { // original already has correct salt byte
+                    skipScanStartKey = replaceSaltByte(skipScanStartKey, prefixBytes);
+                }
+                if (skipScanStopKey != originalStopKey) {
+                    skipScanStopKey = replaceSaltByte(skipScanStopKey, prefixBytes);
+                }
+            } else if (keyOffset > 0) {
+                if (skipScanStartKey == originalStartKey) {
+                    skipScanStartKey = stripPrefix(skipScanStartKey, keyOffset);
+                }
+                if (skipScanStopKey == originalStopKey) {
+                    skipScanStopKey = stripPrefix(skipScanStopKey, keyOffset);
+                }
+            }
+            if (scan == null) {
+                return filter.hasIntersect(skipScanStartKey, skipScanStopKey) ? HAS_INTERSECTION : null;
+            }
+            Filter filter = scan.getFilter();
+            SkipScanFilter newSkipScanFilter = null;
+            if (filter instanceof SkipScanFilter) {
+                SkipScanFilter oldSkipScanFilter = (SkipScanFilter)filter;
+                newFilter = newSkipScanFilter = oldSkipScanFilter.intersect(skipScanStartKey, skipScanStopKey);
+                if (newFilter == null) {
+                    return null;
+                }
+            } else if (filter instanceof FilterList) {
+                FilterList oldList = (FilterList)filter;
+                FilterList newList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+                newFilter = newList;
+                for (Filter f : oldList.getFilters()) {
+                    if (f instanceof SkipScanFilter) {
+                        newSkipScanFilter = ((SkipScanFilter)f).intersect(skipScanStartKey, skipScanStopKey);
+                        if (newSkipScanFilter == null) {
+                            return null;
+                        }
+                        newList.addFilter(newSkipScanFilter);
+                    } else {
+                        newList.addFilter(f);
+                    }
+                }
+            }
+            // TODO: it seems that our SkipScanFilter or HBase runs into problems if we don't
+            // have an enclosing range when we do a point lookup.
+            if (isPointLookup) {
+                scanStartKey = ScanUtil.getMinKey(schema, newSkipScanFilter.getSlots(), slotSpan);
+                scanStopKey = ScanUtil.getMaxKey(schema, newSkipScanFilter.getSlots(), slotSpan);
+            }
+        }
+        if (newFilter == null) {
+            newFilter = scan.getFilter();
+        }
+        Scan newScan = ScanUtil.newScan(scan);
+        newScan.setFilter(newFilter);
+        // If we have an offset (salted table or local index), we need to make sure to
+        // prefix our scan start/stop row by the prefix of the startKey or stopKey that
+        // were passed in. Our scan either doesn't have the prefix or has a placeholder
+        // for it.
+        if (totalKeyOffset > 0) {
+            if (scanStartKey != originalStartKey) {
+                scanStartKey = prefixKey(scanStartKey, scanKeyOffset, prefixBytes, keyOffset);
+            }
+            if (scanStopKey != originalStopKey) {
+                scanStopKey = prefixKey(scanStopKey, scanKeyOffset, prefixBytes, keyOffset);
+            }
+        }
+        newScan.setStartRow(scanStartKey);
+        newScan.setStopRow(scanStopKey);
+        
+        return newScan;
     }
 
+    /**
+     * Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey
+     * intersects with any of the scan ranges and false otherwise. We cannot pass in
+     * a KeyRange here, because the underlying compare functions expect lower inclusive
+     * and upper exclusive keys. We cannot get their next key because the key must
+     * conform to the row key schema and if a null byte is added to a lower inclusive
+     * key, it's no longer a valid, real key.
+     * @param lowerInclusiveKey lower inclusive key
+     * @param upperExclusiveKey upper exclusive key
+     * @return true if the scan range intersects with the specified lower/upper key
+     * range
+     */
+    public boolean intersects(byte[] lowerInclusiveKey, byte[] upperExclusiveKey, int keyOffset) {
+        if (isEverything()) {
+            return true;
+        }
+        if (isDegenerate()) {
+            return false;
+        }
+        
+        //return filter.hasIntersect(lowerInclusiveKey, upperExclusiveKey);
+        return intersectScan(null, lowerInclusiveKey, upperExclusiveKey, keyOffset) == HAS_INTERSECTION;
+    }
+    
     public SkipScanFilter getSkipScanFilter() {
         return filter;
     }
@@ -132,11 +395,15 @@ public class ScanRanges {
      *    not the last key slot
      */
     public boolean useSkipScanFilter() {
+        return useSkipScanFilter;
+    }
+    
+    private static boolean useSkipScanFilter(boolean forceRangeScan, boolean isPointLookup, List<List<KeyRange>> ranges) {
         if (forceRangeScan) {
             return false;
         }
         if (isPointLookup) {
-            return getPointLookupCount() > 1;
+            return getPointLookupCount(isPointLookup, ranges) > 1;
         }
         boolean hasRangeKey = false, useSkipScan = false;
         for (List<KeyRange> orRanges : ranges) {
@@ -208,6 +475,10 @@ public class ScanRanges {
     }
     
     public int getPointLookupCount() {
+        return getPointLookupCount(isPointLookup, ranges);
+    }
+    
+    private static int getPointLookupCount(boolean isPointLookup, List<List<KeyRange>> ranges) {
         return isPointLookup ? ranges.get(0).size() : 0;
     }
     
@@ -215,51 +486,6 @@ public class ScanRanges {
         return isPointLookup ? ranges.get(0).iterator() : Iterators.<KeyRange>emptyIterator();
     }
 
-    public void setScanStartStopRow(Scan scan) {
-        if (isEverything()) {
-            return;
-        }
-        if (isDegenerate()) {
-            scan.setStartRow(KeyRange.EMPTY_RANGE.getLowerRange());
-            scan.setStopRow(KeyRange.EMPTY_RANGE.getUpperRange());
-            return;
-        }
-        
-        byte[] expectedKey;
-        expectedKey = ScanUtil.getMinKey(schema, ranges, slotSpan);
-        if (expectedKey != null) {
-            scan.setStartRow(expectedKey);
-        }
-        expectedKey = ScanUtil.getMaxKey(schema, ranges, slotSpan);
-        if (expectedKey != null) {
-            scan.setStopRow(expectedKey);
-        }
-    }
-
-    public static final ImmutableBytesWritable UNBOUND = new ImmutableBytesWritable(KeyRange.UNBOUND);
-
-    /**
-     * Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey
-     * intersects with any of the scan ranges and false otherwise. We cannot pass in
-     * a KeyRange here, because the underlying compare functions expect lower inclusive
-     * and upper exclusive keys. We cannot get their next key because the key must
-     * conform to the row key schema and if a null byte is added to a lower inclusive
-     * key, it's no longer a valid, real key.
-     * @param lowerInclusiveKey lower inclusive key
-     * @param upperExclusiveKey upper exclusive key
-     * @return true if the scan range intersects with the specified lower/upper key
-     * range
-     */
-    public boolean intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) {
-        if (isEverything()) {
-            return true;
-        }
-        if (isDegenerate()) {
-            return false;
-        }
-        return filter.hasIntersect(lowerInclusiveKey, upperExclusiveKey);
-    }
-    
     public int getPkColumnSpan() {
         return this == ScanRanges.NOTHING ? 0 : ScanUtil.getTotalSpan(ranges, slotSpan);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 3cb6ce9..887ca3e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -31,7 +31,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -39,7 +38,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.NumberUtil;
-import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.Maps;
 
@@ -67,7 +65,6 @@ public class StatementContext {
     
     private long currentTime = QueryConstants.UNSET_TIMESTAMP;
     private ScanRanges scanRanges = ScanRanges.EVERYTHING;
-    private KeyRange minMaxRange = null;
     private final SequenceManager sequences; 
 
     private TableRef currentTable;
@@ -158,41 +155,8 @@ public class StatementContext {
     }
     
     public void setScanRanges(ScanRanges scanRanges) {
-        setScanRanges(scanRanges, null);
-    }
-
-    public void setScanRanges(ScanRanges scanRanges, KeyRange minMaxRange) {
         this.scanRanges = scanRanges;
-        this.scanRanges.setScanStartStopRow(scan);
-        PTable table = this.getCurrentTable().getTable();
-        if (minMaxRange != null) {
-            // Ensure minMaxRange is lower inclusive and upper exclusive, as that's
-            // what we need to intersect against for the HBase scan.
-            byte[] lowerRange = minMaxRange.getLowerRange();
-            if (!minMaxRange.lowerUnbound()) {
-                if (!minMaxRange.isLowerInclusive()) {
-                    lowerRange = ScanUtil.nextKey(lowerRange, table, tempPtr);
-                }
-            }
-            
-            byte[] upperRange = minMaxRange.getUpperRange();
-            if (!minMaxRange.upperUnbound()) {
-                if (minMaxRange.isUpperInclusive()) {
-                    upperRange = ScanUtil.nextKey(upperRange, table, tempPtr);
-                }
-            }
-            if (minMaxRange.getLowerRange() != lowerRange || minMaxRange.getUpperRange() != upperRange) {
-                minMaxRange = KeyRange.getKeyRange(lowerRange, true, upperRange, false);
-            }
-            // If we're not salting, we can intersect this now with the scan range.
-            // Otherwise, we have to wait to do this when we chunk up the scan.
-            if (table.getBucketNum() == null) {
-                minMaxRange = minMaxRange.intersect(KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow()));
-                scan.setStartRow(minMaxRange.getLowerRange());
-                scan.setStopRow(minMaxRange.getUpperRange());
-            }
-            this.minMaxRange = minMaxRange;
-        }
+        scanRanges.initializeScan(scan);
     }
     
     public PhoenixConnection getConnection() {
@@ -224,14 +188,6 @@ public class StatementContext {
         return currentTime;
     }
 
-    /**
-     * Get the key range derived from row value constructor usage in where clause. These are orthogonal to the ScanRanges
-     * and form a range for which each scan is intersected against.
-     */
-    public KeyRange getMinMaxRange () {
-        return minMaxRange;
-    }
-    
     public SequenceManager getSequenceManager(){
         return sequences;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 9cd7e01..634bd15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -58,6 +58,7 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
@@ -146,14 +147,30 @@ public class WhereOptimizer {
         RowKeySchema schema = table.getRowKeySchema();
         List<List<KeyRange>> cnf = Lists.newArrayListWithExpectedSize(schema.getMaxFields());
         KeyRange minMaxRange = keySlots.getMinMaxRange();
-        boolean hasMinMaxRange = (minMaxRange != null);
+        if (minMaxRange == null) {
+            minMaxRange = KeyRange.EVERYTHING_RANGE;
+        }
+        boolean hasMinMaxRange = (minMaxRange != KeyRange.EVERYTHING_RANGE);
         int minMaxRangeOffset = 0;
         byte[] minMaxRangePrefix = null;
+        boolean isSalted = nBuckets != null;
+        boolean isMultiTenant = tenantId != null && table.isMultiTenant();
+        boolean hasViewIndex = table.getViewIndexId() != null;
+        if (hasMinMaxRange) {
+            int minMaxRangeSize = (isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0)
+                    + (isMultiTenant ? tenantId.getBytes().length + 1 : 0) 
+                    + (hasViewIndex ? MetaDataUtil.getViewIndexIdDataType().getByteSize() : 0);
+            minMaxRangePrefix = new byte[minMaxRangeSize];
+        }
         
         Iterator<KeyExpressionVisitor.KeySlot> iterator = keySlots.iterator();
         // Add placeholder for salt byte ranges
-        if (nBuckets != null) {
+        if (isSalted) {
             cnf.add(SALT_PLACEHOLDER);
+            if (hasMinMaxRange) {
+	            System.arraycopy(SALT_PLACEHOLDER.get(0).getLowerRange(), 0, minMaxRangePrefix, minMaxRangeOffset, SaltingUtil.NUM_SALTING_BYTES);
+	            minMaxRangeOffset += SaltingUtil.NUM_SALTING_BYTES;
+            }
             // Increment the pkPos, as the salt column is in the row schema
             // Do not increment the iterator, though, as there will never be
             // an expression in the keySlots for the salt column
@@ -161,13 +178,12 @@ public class WhereOptimizer {
         }
         
         // Add tenant data isolation for tenant-specific tables
-        if (tenantId != null && table.isMultiTenant()) {
+        if (isMultiTenant) {
             byte[] tenantIdBytes = tenantId.getBytes();
             KeyRange tenantIdKeyRange = KeyRange.getKeyRange(tenantIdBytes);
             cnf.add(singletonList(tenantIdKeyRange));
             if (hasMinMaxRange) {
-                minMaxRangePrefix = new byte[tenantIdBytes.length + MetaDataUtil.getViewIndexIdDataType().getByteSize() + 1];
-                System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, 0, tenantIdBytes.length);
+                System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, minMaxRangeOffset, tenantIdBytes.length);
                 minMaxRangeOffset += tenantIdBytes.length;
                 if (!schema.getField(pkPos).getDataType().isFixedWidth()) {
                     minMaxRangePrefix[minMaxRangeOffset] = QueryConstants.SEPARATOR_BYTE;
@@ -178,14 +194,11 @@ public class WhereOptimizer {
         }
         // Add unique index ID for shared indexes on views. This ensures
         // that different indexes don't interleave.
-        if (table.getViewIndexId() != null) {
+        if (hasViewIndex) {
             byte[] viewIndexBytes = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
             KeyRange indexIdKeyRange = KeyRange.getKeyRange(viewIndexBytes);
             cnf.add(singletonList(indexIdKeyRange));
             if (hasMinMaxRange) {
-                if (minMaxRangePrefix == null) {
-                    minMaxRangePrefix = new byte[viewIndexBytes.length];
-                }
                 System.arraycopy(viewIndexBytes, 0, minMaxRangePrefix, minMaxRangeOffset, viewIndexBytes.length);
                 minMaxRangeOffset += viewIndexBytes.length;
             }
@@ -194,7 +207,7 @@ public class WhereOptimizer {
         
         // Prepend minMaxRange with fixed column values so we can properly intersect the
         // range with the other range.
-        if (minMaxRange != null) {
+        if (hasMinMaxRange) {
             minMaxRange = minMaxRange.prependRange(minMaxRangePrefix, 0, minMaxRangeOffset);
         }
         boolean forcedSkipScan = statement.getHint().hasHint(Hint.SKIP_SCAN);
@@ -285,9 +298,9 @@ public class WhereOptimizer {
         // If we have fully qualified point keys with multi-column spans (i.e. RVC),
         // we can still use our skip scan. The ScanRanges.create() call will explode
         // out the keys.
-        context.setScanRanges(
-                ScanRanges.create(schema, cnf, Arrays.copyOf(slotSpan, cnf.size()), forcedRangeScan, nBuckets),
-                minMaxRange);
+        slotSpan = Arrays.copyOf(slotSpan, cnf.size());
+        ScanRanges scanRanges = ScanRanges.create(schema, cnf, slotSpan, minMaxRange, forcedRangeScan, nBuckets);
+        context.setScanRanges(scanRanges);
         if (whereClause == null) {
             return null;
         } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index a1d943c..f1f05be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -110,7 +110,6 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.stat.PTableStats;
 import org.apache.phoenix.schema.stat.PTableStatsImpl;
-import org.apache.phoenix.schema.stat.StatisticsUtils;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -239,8 +238,14 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         int length = getVarCharLength(keyBuffer, keyOffset, keyLength);
         return PNameFactory.newName(keyBuffer, keyOffset, length);
     }
-    
-    private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp) throws IOException {
+
+    private static Scan newTableRowsScan(byte[] key)
+            throws IOException {
+        return newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP, HConstants.LATEST_TIMESTAMP);
+    }
+
+    private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp)
+            throws IOException {
         Scan scan = new Scan();
         scan.setTimeRange(startTimeStamp, stopTimeStamp);
         scan.setStartRow(key);
@@ -454,7 +459,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         }
         PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getTableName(
                 schemaName.getString(), tableName.getString())) : physicalTables.get(0);
-        PTableStats stats = updateStatsInternal(physicalTableName.getBytes(), columns);
+        PTableStats stats = tenantId == null ? updateStatsInternal(physicalTableName.getBytes()) : null;
         return PTableImpl
                 .makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName,
                         saltBucketNum, columns, tableType == INDEX ? dataTableName : null, indexes, isImmutableRows,
@@ -462,62 +467,48 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
                         viewIndexId, stats);
     }
     
-    private PTableStats updateStatsInternal(byte[] tableNameBytes, List<PColumn> columns) throws IOException {
-        List<PName> family = Lists.newArrayListWithExpectedSize(columns.size());
-        for (PColumn column : columns) {
-            PName familyName = column.getFamilyName();
-            if (familyName != null) {
-                family.add(familyName);
-            }
-        }
+    private PTableStats updateStatsInternal(byte[] tableNameBytes) throws IOException {
         HTable statsHTable = null;
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         try {
             // Can we do a new HTable instance here? Or get it from a pool or cache of these instances?
             statsHTable = new HTable(getEnvironment().getConfiguration(), PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
-            Scan s = new Scan();
-            if (tableNameBytes != null) {
-                // Check for an efficient way here
-                s.setStartRow(tableNameBytes);
-                s.setStopRow(ByteUtil.nextKey(tableNameBytes));
-            }
+            Scan s = newTableRowsScan(tableNameBytes);
+            s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
             ResultScanner scanner = statsHTable.getScanner(s);
             Result result = null;
-            byte[] fam = null;
-            List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(columns.size());
             TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
             while ((result = scanner.next()) != null) {
-                KeyValue[] kvs = result.raw();
-                for(KeyValue kv : kvs) {
-                    // For now collect only guide posts
-                    if (Bytes.equals(kv.getQualifier(), PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES)) {
-                        byte[] cfInCell = StatisticsUtils.getCFFromRowKey(tableNameBytes, kv.getRow());
-                        if (fam == null) {
-                            fam = cfInCell;
-                        } else if (!Bytes.equals(fam, cfInCell)) {
-                            // Sort all the guide posts
-                            guidePostsPerCf.put(cfInCell, guidePosts);
-                            guidePosts = new ArrayList<byte[]>();
-                            fam = cfInCell;
-                        }
-                        byte[] guidePostVal = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(),
-                                kv.getValueLength()).copyBytesIfNecessary();
-                        PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(guidePostVal);
-                        if (array != null && array.getDimensions() != 0) {
-                            for (int j = 0; j < array.getDimensions(); j++) {
-                                byte[] gp = array.toBytes(j);
-                                if (gp.length != 0) {
-                                    guidePosts.add(gp);
-                                }
-                            }
+                KeyValue current = result.raw()[0];
+                int tableNameLength = tableNameBytes.length + 1;
+                int cfOffset = current.getRowOffset() + tableNameLength;
+                int cfLength = getVarCharLength(current.getRow(), cfOffset, current.getRowLength() - tableNameLength);
+                ptr.set(current.getRow(), cfOffset, cfLength);
+                byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getValue(), current.getValueOffset(), current
+                        .getValueLength());
+                if (array != null && array.getDimensions() != 0) {
+                    List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions());                        
+                    for (int j = 0; j < array.getDimensions(); j++) {
+                        byte[] gp = array.toBytes(j);
+                        if (gp.length != 0) {
+                            guidePosts.add(gp);
                         }
                     }
+                    List<byte[]> gps = guidePostsPerCf.put(cfName, guidePosts);
+                    if (gps != null) { // Add guidepost already there from other regions
+                        guidePosts.addAll(gps);
+                    }
                 }
             }
-            if (fam != null) {
-                // Sort all the guideposts
-                guidePostsPerCf.put(fam, guidePosts);
+            if (!guidePostsPerCf.isEmpty()) {
+                // Sort guideposts, as the order above will depend on the order we traverse
+                // each region's worth of guideposts above.
+                for (List<byte[]> gps : guidePostsPerCf.values()) {
+                    Collections.sort(gps, Bytes.BYTES_COMPARATOR);
+                }
+                return new PTableStatsImpl(guidePostsPerCf);
             }
-            return new PTableStatsImpl(guidePostsPerCf);
         } catch (Exception e) {
             if (e instanceof org.apache.hadoop.hbase.TableNotFoundException) {
                 logger.warn("Stats table not yet online", e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index d45e036..9f294a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -65,7 +65,7 @@ import org.apache.phoenix.schema.TableRef;
  * 
  * @since 0.1
  */
-public class AggregatePlan extends BasicQueryPlan {
+public class AggregatePlan extends BaseQueryPlan {
     private final Aggregators aggregators;
     private final Expression having;
     private List<KeyRange> splits;
@@ -164,7 +164,7 @@ public class AggregatePlan extends BasicQueryPlan {
              */
             context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, PDataType.INTEGER.toBytes(limit));
         }
-        ParallelIterators parallelIterators = new ParallelIterators(context, tableRef, statement, projection, groupBy, null, wrapParallelIteratorFactory());
+        ParallelIterators parallelIterators = new ParallelIterators(this, null, wrapParallelIteratorFactory());
         splits = parallelIterators.getSplits();
 
         AggregatingResultIterator aggResultIterator;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
new file mode 100644
index 0000000..eb43aa4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.DelegateResultIterator;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ScanUtil;
+
+import com.google.common.collect.Lists;
+
+
+
+/**
+ *
+ * Query plan that has no child plans
+ *
+ * 
+ * @since 0.1
+ */
+public abstract class BaseQueryPlan implements QueryPlan {
+    protected static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K
+    
+    protected final TableRef tableRef;
+    protected final StatementContext context;
+    protected final FilterableStatement statement;
+    protected final RowProjector projection;
+    protected final ParameterMetaData paramMetaData;
+    protected final Integer limit;
+    protected final OrderBy orderBy;
+    protected final GroupBy groupBy;
+    protected final ParallelIteratorFactory parallelIteratorFactory;
+
+    protected BaseQueryPlan(
+            StatementContext context, FilterableStatement statement, TableRef table,
+            RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy,
+            GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) {
+        this.context = context;
+        this.statement = statement;
+        this.tableRef = table;
+        this.projection = projection;
+        this.paramMetaData = paramMetaData;
+        this.limit = limit;
+        this.orderBy = orderBy;
+        this.groupBy = groupBy;
+        this.parallelIteratorFactory = parallelIteratorFactory;
+    }
+
+    @Override
+    public boolean isDegenerate() {
+        return context.getScanRanges() == ScanRanges.NOTHING;
+
+    }
+    
+    @Override
+    public GroupBy getGroupBy() {
+        return groupBy;
+    }
+
+    
+    @Override
+    public OrderBy getOrderBy() {
+        return orderBy;
+    }
+
+    @Override
+    public TableRef getTableRef() {
+        return tableRef;
+    }
+
+    @Override
+    public Integer getLimit() {
+        return limit;
+    }
+
+    @Override
+    public RowProjector getProjector() {
+        return projection;
+    }
+
+//    /**
+//     * Sets up an id used to do round robin queue processing on the server
+//     * @param scan
+//     */
+//    private void setProducer(Scan scan) {
+//        byte[] producer = Bytes.toBytes(UUID.randomUUID().toString());
+//        scan.setAttribute(HBaseServer.CALL_QUEUE_PRODUCER_ATTRIB_NAME, producer);
+//    }
+    
+    @Override
+    public final ResultIterator iterator() throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList());
+    }
+
+    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException {
+        if (context.getScanRanges() == ScanRanges.NOTHING) {
+            return ResultIterator.EMPTY_ITERATOR;
+        }
+        
+        Scan scan = context.getScan();
+        // Set producer on scan so HBase server does round robin processing
+        //setProducer(scan);
+        // Set the time range on the scan so we don't get back rows newer than when the statement was compiled
+        // The time stamp comes from the server at compile time when the meta data
+        // is resolved.
+        // TODO: include time range in explain plan?
+        PhoenixConnection connection = context.getConnection();
+        if (context.getScanTimeRange() == null) {
+          Long scn = connection.getSCN();
+          if (scn == null) {
+            scn = context.getCurrentTime();
+            // Add one to server time since max of time range is exclusive
+            // and we need to account of OSs with lower resolution clocks.
+            if (scn < HConstants.LATEST_TIMESTAMP) {
+              scn++;
+            }
+          }
+          ScanUtil.setTimeRange(scan, scn);
+        } else {
+          try {
+            scan.setTimeRange(context.getScanTimeRange().getMin(), context.getScanTimeRange().getMax());
+          } catch (IOException e) {
+            throw new SQLException(e);
+          }
+        }
+        ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes());
+        ResultIterator iterator = newIterator();
+        return dependencies.isEmpty() ? 
+                iterator : new DelegateResultIterator(iterator) {
+            @Override
+            public void close() throws SQLException {
+                try {
+                    super.close();
+                } finally {
+                    SQLCloseables.closeAll(dependencies);
+                }
+            }
+        };
+    }
+
+    abstract protected ResultIterator newIterator() throws SQLException;
+    
+    @Override
+    public long getEstimatedSize() {
+        return DEFAULT_ESTIMATED_SIZE;
+    }
+
+    @Override
+    public ParameterMetaData getParameterMetaData() {
+        return paramMetaData;
+    }
+
+    @Override
+    public FilterableStatement getStatement() {
+        return statement;
+    }
+
+    @Override
+    public StatementContext getContext() {
+        return context;
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        if (context.getScanRanges() == ScanRanges.NOTHING) {
+            return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + tableRef.getTable().getName().getString()));
+        }
+        
+        // Optimize here when getting explain plan, as queries don't get optimized until after compilation
+        QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
+        ResultIterator iterator = plan.iterator();
+        List<String> planSteps = Lists.newArrayListWithExpectedSize(5);
+        iterator.explain(planSteps);
+        return new ExplainPlan(planSteps);
+    }
+
+    @Override
+    public boolean isRowKeyOrdered() {
+        return groupBy.isEmpty() ? orderBy.getOrderByExpressions().isEmpty() : groupBy.isOrderPreserving();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
deleted file mode 100644
index 1aa3892..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import java.io.IOException;
-import java.sql.ParameterMetaData;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.phoenix.compile.ExplainPlan;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.DelegateResultIterator;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.SQLCloseable;
-import org.apache.phoenix.util.SQLCloseables;
-import org.apache.phoenix.util.ScanUtil;
-
-import com.google.common.collect.Lists;
-
-
-
-/**
- *
- * Query plan that has no child plans
- *
- * 
- * @since 0.1
- */
-public abstract class BasicQueryPlan implements QueryPlan {
-    protected static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K
-    
-    protected final TableRef tableRef;
-    protected final StatementContext context;
-    protected final FilterableStatement statement;
-    protected final RowProjector projection;
-    protected final ParameterMetaData paramMetaData;
-    protected final Integer limit;
-    protected final OrderBy orderBy;
-    protected final GroupBy groupBy;
-    protected final ParallelIteratorFactory parallelIteratorFactory;
-
-    protected BasicQueryPlan(
-            StatementContext context, FilterableStatement statement, TableRef table,
-            RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy,
-            GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) {
-        this.context = context;
-        this.statement = statement;
-        this.tableRef = table;
-        this.projection = projection;
-        this.paramMetaData = paramMetaData;
-        this.limit = limit;
-        this.orderBy = orderBy;
-        this.groupBy = groupBy;
-        this.parallelIteratorFactory = parallelIteratorFactory;
-    }
-
-    @Override
-    public boolean isDegenerate() {
-        return context.getScanRanges() == ScanRanges.NOTHING;
-
-    }
-    
-    @Override
-    public GroupBy getGroupBy() {
-        return groupBy;
-    }
-
-    
-    @Override
-    public OrderBy getOrderBy() {
-        return orderBy;
-    }
-
-    @Override
-    public TableRef getTableRef() {
-        return tableRef;
-    }
-
-    @Override
-    public Integer getLimit() {
-        return limit;
-    }
-
-    @Override
-    public RowProjector getProjector() {
-        return projection;
-    }
-
-//    /**
-//     * Sets up an id used to do round robin queue processing on the server
-//     * @param scan
-//     */
-//    private void setProducer(Scan scan) {
-//        byte[] producer = Bytes.toBytes(UUID.randomUUID().toString());
-//        scan.setAttribute(HBaseServer.CALL_QUEUE_PRODUCER_ATTRIB_NAME, producer);
-//    }
-    
-    @Override
-    public final ResultIterator iterator() throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList());
-    }
-
-    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException {
-        if (context.getScanRanges() == ScanRanges.NOTHING) {
-            return ResultIterator.EMPTY_ITERATOR;
-        }
-        
-        Scan scan = context.getScan();
-        // Set producer on scan so HBase server does round robin processing
-        //setProducer(scan);
-        // Set the time range on the scan so we don't get back rows newer than when the statement was compiled
-        // The time stamp comes from the server at compile time when the meta data
-        // is resolved.
-        // TODO: include time range in explain plan?
-        PhoenixConnection connection = context.getConnection();
-        if (context.getScanTimeRange() == null) {
-          Long scn = connection.getSCN();
-          if (scn == null) {
-            scn = context.getCurrentTime();
-            // Add one to server time since max of time range is exclusive
-            // and we need to account of OSs with lower resolution clocks.
-            if (scn < HConstants.LATEST_TIMESTAMP) {
-              scn++;
-            }
-          }
-          ScanUtil.setTimeRange(scan, scn);
-        } else {
-          try {
-            scan.setTimeRange(context.getScanTimeRange().getMin(), context.getScanTimeRange().getMax());
-          } catch (IOException e) {
-            throw new SQLException(e);
-          }
-        }
-        ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes());
-        ResultIterator iterator = newIterator();
-        return dependencies.isEmpty() ? 
-                iterator : new DelegateResultIterator(iterator) {
-            @Override
-            public void close() throws SQLException {
-                try {
-                    super.close();
-                } finally {
-                    SQLCloseables.closeAll(dependencies);
-                }
-            }
-        };
-    }
-
-    abstract protected ResultIterator newIterator() throws SQLException;
-    
-    @Override
-    public long getEstimatedSize() {
-        return DEFAULT_ESTIMATED_SIZE;
-    }
-
-    @Override
-    public ParameterMetaData getParameterMetaData() {
-        return paramMetaData;
-    }
-
-    @Override
-    public FilterableStatement getStatement() {
-        return statement;
-    }
-
-    @Override
-    public StatementContext getContext() {
-        return context;
-    }
-
-    @Override
-    public ExplainPlan getExplainPlan() throws SQLException {
-        if (context.getScanRanges() == ScanRanges.NOTHING) {
-            return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + tableRef.getTable().getName().getString()));
-        }
-        
-        // Optimize here when getting explain plan, as queries don't get optimized until after compilation
-        QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
-        ResultIterator iterator = plan.iterator();
-        List<String> planSteps = Lists.newArrayListWithExpectedSize(5);
-        iterator.explain(planSteps);
-        return new ExplainPlan(planSteps);
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 0e8eb50..80c4727 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -30,7 +30,7 @@ import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.*;
 import org.apache.phoenix.schema.TableRef;
 
-public class DegenerateQueryPlan extends BasicQueryPlan {
+public class DegenerateQueryPlan extends BaseQueryPlan {
 
     public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) {
         super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 66ad235..5ffcaeb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -79,7 +79,7 @@ public class HashJoinPlan implements QueryPlan {
     private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
 
     private final FilterableStatement statement;
-    private final BasicQueryPlan plan;
+    private final BaseQueryPlan plan;
     private final HashJoinInfo joinInfo;
     private final SubPlan[] subPlans;
     private final boolean recompileWhereClause;
@@ -93,8 +93,8 @@ public class HashJoinPlan implements QueryPlan {
     
     public static HashJoinPlan create(FilterableStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) {
-        if (plan instanceof BasicQueryPlan)
-            return new HashJoinPlan(statement, (BasicQueryPlan) plan, joinInfo, subPlans, joinInfo == null);
+        if (plan instanceof BaseQueryPlan)
+            return new HashJoinPlan(statement, (BaseQueryPlan) plan, joinInfo, subPlans, joinInfo == null);
         
         assert (plan instanceof HashJoinPlan);
         HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
@@ -111,7 +111,7 @@ public class HashJoinPlan implements QueryPlan {
     }
     
     private HashJoinPlan(FilterableStatement statement, 
-            BasicQueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) {
+            BaseQueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) {
         this.statement = statement;
         this.plan = plan;
         this.joinInfo = joinInfo;
@@ -462,6 +462,11 @@ public class HashJoinPlan implements QueryPlan {
         }
         
     }
+
+    @Override
+    public boolean isRowKeyOrdered() {
+        return plan.isRowKeyOrdered();
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 03deca7..dfb8fec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -56,12 +56,12 @@ import org.apache.phoenix.util.ScanUtil;
  *
  * @since 0.1
  */
-public class ScanPlan extends BasicQueryPlan {
+public class ScanPlan extends BaseQueryPlan {
     private List<KeyRange> splits;
     private boolean allowPageFilter;
 
     public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) {
-        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, null,
+        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
                         buildResultIteratorFactory(context, table, orderBy));
         this.allowPageFilter = allowPageFilter;
@@ -110,7 +110,7 @@ public class ScanPlan extends BasicQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        ParallelIterators iterators = new ParallelIterators(context, tableRef, statement, projection, GroupBy.EMPTY_GROUP_BY, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
+        ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
         splits = iterators.getSplits();
         if (isOrdered) {
             scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
index 1bcfcd0..adeb2ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
@@ -105,7 +105,7 @@ public class SkipScanFilter extends FilterBase {
     }
 
     // Exposed for testing.
-    List<List<KeyRange>> getSlots() {
+    public List<List<KeyRange>> getSlots() {
         return slots;
     }
 
@@ -178,14 +178,35 @@ public class SkipScanFilter extends FilterBase {
             schema.next(ptr, 0, schema.iterator(upperExclusiveKey,ptr), slotSpan[0]);
             endPos = ScanUtil.searchClosestKeyRangeWithUpperHigherThanPtr(slots.get(0), ptr, startPos);
             // Upper range lower than first lower range of first slot, so cannot possibly be in range
-            if (endPos == 0 && Bytes.compareTo(upperExclusiveKey, slots.get(0).get(0).getLowerRange()) <= 0) {
-                return false;
-            }
+//            if (endPos == 0 && Bytes.compareTo(upperExclusiveKey, slots.get(0).get(0).getLowerRange()) <= 0) {
+//                return false;
+//            }
             // Past last position, so we can include everything from the start position
             if (endPos >= slots.get(0).size()) {
                 upperUnbound = true;
                 endPos = slots.get(0).size()-1;
+            } else if (slots.get(0).get(endPos).compareLowerToUpperBound(upperExclusiveKey) >= 0) {
+                // We know that the endPos range is higher than the previous range, but we need
+                // to test if it ends before the next range starts.
+                endPos--;
+            }
+            if (endPos < startPos) {
+                return false;
+            }
+            
+        }
+        // Short circuit out if we only have a single set of keys
+        if (slots.size() == 1) {
+//            int offset = slots.get(0).get(endPos).compareLowerToUpperBound(upperExclusiveKey) < 0 ? 1 : 0;
+//            if (endPos + offset <= startPos) {
+//                return false;
+//            }
+//            List<KeyRange> newRanges = slots.get(0).subList(startPos, endPos + offset);
+            if (newSlots != null) {
+                List<KeyRange> newRanges = slots.get(0).subList(startPos, endPos+1);
+                newSlots.add(newRanges);
             }
+            return true;
         }
         if (!lowerUnbound) {
             position[0] = startPos;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index f486a47..8822e49 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -59,7 +59,6 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
         }
         Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
         ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-        scanRanges.setScanStartStopRow(scan);
         scan.setFilter(scanRanges.getSkipScanFilter());
         HRegion region = this.env.getRegion();
         RegionScanner scanner = region.getScanner(scan);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
deleted file mode 100644
index b8135d1..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.iterate;
-
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-
-/**
- * Default strategy for splitting regions in ParallelIterator. Refactored from the
- * original version.
- * 
- * 
- * 
- */
-public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRegionSplitter {
-
-    protected final long guidePostsDepth;
-    protected final StatementContext context;
-    protected final PTable table;
-
-    private static final Logger logger = LoggerFactory.getLogger(DefaultParallelIteratorRegionSplitter.class);
-    public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) {
-        return new DefaultParallelIteratorRegionSplitter(context, table, hintNode);
-    }
-
-    protected DefaultParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) {
-        this.context = context;
-        this.table = table;
-        ReadOnlyProps props = context.getConnection().getQueryServices().getProps();
-        this.guidePostsDepth = props.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
-                QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
-    }
-
-    // Get the mapping between key range and the regions that contains them.
-    protected List<HRegionLocation> getAllRegions() throws SQLException {
-        Scan scan = context.getScan();
-        List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices()
-                .getAllTableRegions(table.getPhysicalName().getBytes());
-        // If we're not salting, then we've already intersected the minMaxRange with the scan range
-        // so there's nothing to do here.
-        return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow());
-    }
-
-    /**
-     * Filters out regions that intersect with key range specified by the startKey and stopKey
-     * @param allTableRegions all region infos for a given table
-     * @param startKey the lower bound of key range, inclusive
-     * @param stopKey the upper bound of key range, inclusive
-     * @return regions that intersect with the key range given by the startKey and stopKey
-     */
-    // exposed for tests
-    public static List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, byte[] startKey, byte[] stopKey) {
-        Iterable<HRegionLocation> regions;
-        final KeyRange keyRange = KeyRange.getKeyRange(startKey, true, stopKey, false);
-        if (keyRange == KeyRange.EVERYTHING_RANGE) {
-            return allTableRegions;
-        }
-        
-        regions = Iterables.filter(allTableRegions, new Predicate<HRegionLocation>() {
-            @Override
-            public boolean apply(HRegionLocation location) {
-                KeyRange regionKeyRange = KeyRange.getKeyRange(location.getRegionInfo().getStartKey(), location
-                        .getRegionInfo().getEndKey());
-                return keyRange.intersect(regionKeyRange) != KeyRange.EMPTY_RANGE;
-            }
-        });
-        return Lists.newArrayList(regions);
-    }
-
-    protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) {
-        if (regions.isEmpty()) { return Collections.emptyList(); }
-        Scan scan = context.getScan();
-        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
-        List<byte[]> gps = Lists.newArrayList();
-        if (!ScanUtil.isAnalyzeTable(scan)) {
-            if (table.getColumnFamilies().isEmpty()) {
-                // For sure we can get the defaultCF from the table
-                gps = table.getGuidePosts();
-            } else {
-                try {
-                    if (scan.getFamilyMap().size() > 0) {
-                        if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
-                            gps = table.getColumnFamily(defaultCF).getGuidePosts();
-                        } else { // Otherwise, just use first CF in use by scan
-                            gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
-                        }
-                    } else {
-                        gps = table.getColumnFamily(defaultCF).getGuidePosts();
-                    }
-                } catch (ColumnFamilyNotFoundException cfne) {
-                    // Alter table does this
-                }
-            }
-
-        }
-
-        List<KeyRange> guidePosts = Lists.newArrayListWithCapacity(regions.size());
-        byte[] currentKey = regions.get(0).getRegionInfo().getStartKey();
-        byte[] endKey = null;
-        int regionIndex = 0;
-        int guideIndex = 0;
-        int gpsSize = gps.size();
-        int regionSize = regions.size();
-        if (currentKey.length > 0) {
-            guideIndex = Collections.binarySearch(gps, currentKey, Bytes.BYTES_COMPARATOR);
-            guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
-        }
-        // Merge bisect with guideposts for all but the last region
-        while (regionIndex < regionSize) {
-            byte[] currentGuidePost;
-            currentKey = regions.get(regionIndex).getRegionInfo().getStartKey();
-            endKey = regions.get(regionIndex++).getRegionInfo().getEndKey();
-            while (guideIndex < gpsSize
-                    && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
-                KeyRange keyRange = KeyRange.getKeyRange(currentKey, currentGuidePost);
-                if (keyRange != KeyRange.EMPTY_RANGE) {
-                    guidePosts.add(keyRange);
-                }
-                currentKey = currentGuidePost;
-                guideIndex++;
-            }
-            KeyRange keyRange = KeyRange.getKeyRange(currentKey, endKey);
-            if (keyRange != KeyRange.EMPTY_RANGE) {
-                guidePosts.add(keyRange);
-            }
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("The captured guideposts are: " + guidePosts);
-        }
-        return guidePosts;
-    }
-
-    @Override
-    public List<KeyRange> getSplits() throws SQLException {
-        return genKeyRanges(getAllRegions());
-    }
-}


[6/6] git commit: Merge branch '3.0' of https://git-wip-us.apache.org/repos/asf/phoenix into 3.0

Posted by ja...@apache.org.
Merge branch '3.0' of https://git-wip-us.apache.org/repos/asf/phoenix into 3.0


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/80e218c2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/80e218c2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/80e218c2

Branch: refs/heads/3.0
Commit: 80e218c2403bc7f057ea334293cf803306fb304b
Parents: 5f6f80b d3e6a9f
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Oct 3 20:56:10 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Oct 3 20:56:10 2014 -0700

----------------------------------------------------------------------
 .../phoenix/cache/aggcache/SpillFile.java       | 42 +++++++++++---------
 .../phoenix/cache/aggcache/SpillManager.java    |  6 ++-
 2 files changed, 28 insertions(+), 20 deletions(-)
----------------------------------------------------------------------



[2/6] PHOENIX-1251 Salted queries with range scan become full table scans

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 84ae243..40a0cff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -230,9 +230,11 @@ public abstract class ExplainTable {
     
     private void appendScanRow(StringBuilder buf, Bound bound) {
         ScanRanges scanRanges = context.getScanRanges();
-        KeyRange minMaxRange = context.getMinMaxRange();
+        // TODO: review this and potentially intersect the scan ranges
+        // with the minMaxRange in ScanRanges to prevent having to do all this.
+        KeyRange minMaxRange = scanRanges.getMinMaxRange();
         Iterator<byte[]> minMaxIterator = Iterators.emptyIterator();
-        if (minMaxRange != null) {
+        if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
             RowKeySchema schema = tableRef.getTable().getRowKeySchema();
             if (!minMaxRange.isUnbound(bound)) {
                 minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound));
@@ -262,8 +264,7 @@ public abstract class ExplainTable {
     
     private void appendKeyRanges(StringBuilder buf) {
         ScanRanges scanRanges = context.getScanRanges();
-        KeyRange minMaxRange = context.getMinMaxRange();
-        if (minMaxRange == null && (scanRanges == ScanRanges.EVERYTHING || scanRanges == ScanRanges.NOTHING)) {
+        if (scanRanges.isDegenerate() || scanRanges.isEverything()) {
             return;
         }
         buf.append(" [");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
deleted file mode 100644
index 0448e46..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.iterate;
-
-import java.sql.SQLException;
-
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.schema.PTable;
-
-
-/**
- * Factory class for the Region Splitter used by the project.
- */
-public class ParallelIteratorRegionSplitterFactory {
-
-    public static ParallelIteratorRegionSplitter getSplitter(StatementContext context, PTable table, HintNode hintNode) throws SQLException {
-        if (context.getScanRanges().useSkipScanFilter()) {
-            return SkipRangeParallelIteratorRegionSplitter.getInstance(context, table, hintNode);
-        }
-        return DefaultParallelIteratorRegionSplitter.getInstance(context, table, hintNode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index edab575..da8c212 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -34,26 +33,28 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
@@ -62,6 +63,7 @@ import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
@@ -71,6 +73,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 
@@ -84,7 +87,10 @@ import com.google.common.collect.Lists;
  */
 public class ParallelIterators extends ExplainTable implements ResultIterators {
 	private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
+    private final List<List<Scan>> scans;
     private final List<KeyRange> splits;
+    private final PTable physicalTable;
+    private final QueryPlan plan;
     private final ParallelIteratorFactory iteratorFactory;
     
     public static interface ParallelIteratorFactory {
@@ -92,6 +98,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
     }
 
     private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
+    private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
 
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
         @Override
@@ -100,10 +107,14 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         }
     };
 
-    public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement,
-            RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
             throws SQLException {
-        super(context, tableRef, groupBy);
+        super(plan.getContext(), plan.getTableRef(), plan.getGroupBy());
+        this.plan = plan;
+        StatementContext context = plan.getContext();
+        TableRef tableRef = plan.getTableRef();
+        FilterableStatement statement = plan.getStatement();
+        RowProjector projector = plan.getProjector();
         MetaDataClient client = new MetaDataClient(context.getConnection());
         PTable physicalTable = tableRef.getTable();
         String physicalName = tableRef.getTable().getPhysicalName().getString();
@@ -122,8 +133,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
                         .getTable(new PTableKey(null, physicalTableName));
             }
         }
-        this.splits = getSplits(context, physicalTable, statement.getHint());
-        this.iteratorFactory = iteratorFactory;
+        this.physicalTable = physicalTable;
         Scan scan = context.getScan();
         PTable table = tableRef.getTable();
         if (projector.isProjectEmptyKeyValue()) {
@@ -148,17 +158,30 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
                 }
             }
         } else if (table.getViewType() == ViewType.MAPPED) {
-                // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
-                // selected column values are returned back to client
-                for (PColumnFamily family : table.getColumnFamilies()) {
-                    scan.addFamily(family.getName().getBytes());
-                }
-        } // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
-        if (limit != null) {
-            ScanUtil.andFilterAtEnd(scan, new PageFilter(limit));
+            // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
+            // selected column values are returned back to client
+            for (PColumnFamily family : table.getColumnFamilies()) {
+                scan.addFamily(family.getName().getBytes());
+            }
+        }
+        
+        // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
+        if (perScanLimit != null) {
+            ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
         }
 
         doColumnProjectionOptimization(context, scan, table, statement);
+        
+        this.iteratorFactory = iteratorFactory;
+        this.scans = getParallelScans(context.getScan());
+        List<List<Scan>> scans = getParallelScans(context.getScan());
+        List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
+        for (List<Scan> scanList : scans) {
+            for (Scan aScan : scanList) {
+                splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
+            }
+        }
+        this.splits = ImmutableList.copyOf(splitRanges);
     }
 
     private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
@@ -241,29 +264,210 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         }
     }
 
+    public List<KeyRange> getSplits() {
+        return splits;
+    }
+
+    private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
+        int nBoundaries = regionLocations.size() - 1;
+        List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
+        for (int i = 0; i < nBoundaries; i++) {
+            HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
+            ranges.add(regionInfo.getEndKey());
+        }
+        return ranges;
+    }
+    
+    private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index+1, as the inclusiveKey will be contained
+        // in the next region (since we're matching on the end boundary).
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
+        return guideIndex;
+    }
+    
+    private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index we found as the exclusiveKey won't be
+        // contained in the next region as with getIndexContainingInclusive.
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
+        return guideIndex;
+    }
+    
+    private List<byte[]> getGuidePosts(PTable table) {
+        Scan scan = context.getScan();
+        boolean isPointLookup = context.getScanRanges().isPointLookup();
+        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
+        List<byte[]> gps = Collections.emptyList();
+        /*
+         *  Don't use guide posts if:
+         *  1) We're doing a point lookup, as HBase is fast enough at those
+         *     to not need them to be further parallelized. TODO: pref test to verify
+         *  2) We're collecting stats, as in this case we need to scan entire
+         *     regions worth of data to track where to put the guide posts.
+         */
+        if (!isPointLookup && !ScanUtil.isAnalyzeTable(scan)) {
+            if (table.getColumnFamilies().isEmpty()) {
+                // For sure we can get the defaultCF from the table
+                return table.getGuidePosts();
+            }
+            try {
+                if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) {
+                    // If default CF is not used in scan, use first CF referenced in scan
+                    return table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
+                }
+                // Otherwise, favor use of default CF.
+                return table.getColumnFamily(defaultCF).getGuidePosts();
+            } catch (ColumnFamilyNotFoundException cfne) {
+                // Alter table does this
+            }
+        }
+        return gps;
+        
+    }
+    
+    private static String toString(List<byte[]> gps) {
+        StringBuilder buf = new StringBuilder(gps.size() * 100);
+        buf.append("[");
+        for (int i = 0; i < gps.size(); i++) {
+            buf.append(Bytes.toStringBinary(gps.get(i)));
+            buf.append(",");
+            if (i < gps.size()-1 && (i % 10) == 0) {
+                buf.append("\n");
+            }
+        }
+        buf.setCharAt(buf.length()-1, ']');
+        return buf.toString();
+    }
+    
+    private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, boolean crossedRegionBoundary) {
+        if (scan == null) {
+            return scans;
+        }
+        if (!scans.isEmpty()) {
+            boolean startNewScanList = false;
+            if (!plan.isRowKeyOrdered()) {
+                startNewScanList = true;
+            } else if (crossedRegionBoundary) {
+                if (physicalTable.getBucketNum() != null) {
+                    byte[] previousStartKey = scans.get(scans.size()-1).getStartRow();
+                    byte[] currentStartKey = scan.getStartRow();
+                    byte[] prefix = ScanUtil.getPrefix(previousStartKey, SaltingUtil.NUM_SALTING_BYTES);
+                    startNewScanList = ScanUtil.crossesPrefixBoundary(currentStartKey, prefix, SaltingUtil.NUM_SALTING_BYTES);
+                }
+            }
+            if (startNewScanList) {
+                parallelScans.add(scans);
+                scans = Lists.newArrayListWithExpectedSize(1);
+            }
+        }
+        scans.add(scan);
+        return scans;
+    }
     /**
-     * Splits the given scan's key range so that each split can be queried in parallel
-     * @param hintNode TODO
-     *
-     * @return the key ranges that should be scanned in parallel
+     * Compute the list of parallel scans to run for a given query. The inner scans
+     * may be concatenated together directly, while the other ones may need to be
+     * merge sorted, depending on the query.
+     * @return list of parallel scans to run for a given query.
+     * @throws SQLException
      */
-    // exposed for tests
-    public static List<KeyRange> getSplits(StatementContext context, PTable table, HintNode hintNode) throws SQLException {
-        return ParallelIteratorRegionSplitterFactory.getSplitter(context, table, hintNode).getSplits();
+    private List<List<Scan>> getParallelScans(final Scan scan) throws SQLException {
+        List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+                .getAllTableRegions(physicalTable.getPhysicalName().getBytes());
+        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+        ScanRanges scanRanges = context.getScanRanges();
+        boolean isSalted = physicalTable.getBucketNum() != null;
+        List<byte[]> gps = getGuidePosts(physicalTable);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Guideposts: " + toString(gps));
+        }
+        boolean traverseAllRegions = isSalted;
+        
+        byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        byte[] currentKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        int regionIndex = 0;
+        int stopIndex = regionBoundaries.size();
+        if (!traverseAllRegions) {
+            startKey = scan.getStartRow();
+            if (startKey.length > 0) {
+                currentKey = startKey;
+                regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+            }
+            stopKey = scan.getStopRow();
+            if (stopKey.length > 0) {
+                stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+            }
+        }
+        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+        
+        int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
+        int gpsSize = gps.size();
+        int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
+        int keyOffset = 0;
+        List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
+        // Merge bisect with guideposts for all but the last region
+        while (regionIndex <= stopIndex) {
+            byte[] currentGuidePost;
+            byte[] endKey = regionIndex == stopIndex ? stopKey : regionBoundaries.get(regionIndex);
+            while (guideIndex < gpsSize
+                    && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
+                Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset);
+                scans = addNewScan(parallelScans, scans, newScan, false);
+                currentKey = currentGuidePost;
+                guideIndex++;
+            }
+            Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset);
+            scans = addNewScan(parallelScans, scans, newScan, true);
+            currentKey = endKey;
+            regionIndex++;
+        }
+        if (!scans.isEmpty()) { // Add any remaining scans
+            parallelScans.add(scans);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("The parallelScans: " + parallelScans);
+        }
+        return parallelScans;
     }
 
-    private static List<KeyRange> toKeyRanges(List<HRegionLocation> regions) {
-        List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(regions.size());
-        for (HRegionLocation region : regions) {
-            keyRanges.add(TO_KEY_RANGE.apply(region));
+    private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
+        if (!concatIterators.isEmpty()) {
+            if (concatIterators.size() == 1) {
+                iterators.add(concatIterators.get(0));
+            } else {
+                // TODO: should ConcatResultIterator have a constructor that takes
+                // a List<PeekingResultIterator>?
+                iterators.add(new ConcatResultIterator(new ResultIterators() {
+    
+                    @Override
+                    public List<PeekingResultIterator> getIterators() throws SQLException {
+                        return concatIterators;
+                    }
+    
+                    @Override
+                    public int size() {
+                        return concatIterators.size();
+                    }
+    
+                    @Override
+                    public void explain(List<String> planSteps) {
+                        // TODO: review what we should for explain plan here
+                        concatIterators.get(0).explain(planSteps);
+                    }
+                    
+                }));
+            }
         }
-        return keyRanges;
     }
     
-    public List<KeyRange> getSplits() {
-        return splits;
+    public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
+        if (!reverse) {
+            return list;
+        }
+        return Lists.reverse(list);
     }
-
+    
     /**
      * Executes the scan in parallel across all regions, blocking until all scans are complete.
      * @return the result iterators for the scan of each region
@@ -271,53 +475,54 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
     @Override
     public List<PeekingResultIterator> getIterators() throws SQLException {
         boolean success = false;
+        boolean isReverse = ScanUtil.isReversed(context.getScan());
         final ConnectionQueryServices services = context.getConnection().getQueryServices();
         ReadOnlyProps props = services.getProps();
-        int numSplits = splits.size();
+        int numSplits = size();
         List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
-        List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits);
+        List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+        // TODO: what purpose does this scanID serve?
         final UUID scanId = UUID.randomUUID();
         try {
-            submitWork(scanId, splits, futures);
+            submitWork(scanId, scans, futures, splits.size());
             int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
-            final int factor = ScanUtil.isReversed(this.context.getScan()) ? -1 : 1;
-            // Sort futures by row key so that we have a predictable order we're getting rows back for scans.
-            // We're going to wait here until they're finished anyway and this makes testing much easier.
-            Collections.sort(futures, new Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() {
-                @Override
-                public int compare(Pair<KeyRange, Future<PeekingResultIterator>> o1, Pair<KeyRange, Future<PeekingResultIterator>> o2) {
-                    return factor * Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange());
-                }
-            });
             boolean clearedCache = false;
-            byte[] tableName = tableRef.getTable().getPhysicalName().getBytes();
-            for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
-                try {
-                    PeekingResultIterator iterator = future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                    iterators.add(iterator);
-                } catch (ExecutionException e) {
-                    try { // Rethrow as SQLException
-                        throw ServerUtil.parseServerException(e);
-                    } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
-                        List<Pair<KeyRange,Future<PeekingResultIterator>>> newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2);
-                        if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
-                            services.clearTableRegionCache(tableName);
-                            clearedCache = true;
-                        }
-                        List<KeyRange> allSplits = toKeyRanges(services.getAllTableRegions(tableName));
-                        // Intersect what was the expected boundary with all new region boundaries and
-                        // resubmit just this portion of work again
-                        List<KeyRange> newSubSplits = KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits);
-                        submitWork(scanId, newSubSplits, newFutures);
-                        for (Pair<KeyRange,Future<PeekingResultIterator>> newFuture : newFutures) {
-                            // Immediate do a get (not catching exception again) and then add the iterators we
-                            // get back immediately. They'll be sorted as expected, since they're replacing the
-                            // original one.
-                            PeekingResultIterator iterator = newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                            iterators.add(iterator);
+            for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
+                List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
+                for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
+                    try {
+                        PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                        concatIterators.add(iterator);
+                    } catch (ExecutionException e) {
+                        try { // Rethrow as SQLException
+                            throw ServerUtil.parseServerException(e);
+                        } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+                            List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
+                            if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
+                                services.clearTableRegionCache(physicalTable.getName().getBytes());
+                                clearedCache = true;
+                            }
+                            // Resubmit just this portion of work again
+                            Scan oldScan = scanPair.getFirst();
+                            List<List<Scan>> newNestedScans = this.getParallelScans(oldScan);
+                            // Add any concatIterators that were successful so far
+                            // as we need these to be in order
+                            addConcatResultIterator(iterators, concatIterators);
+                            concatIterators = Collections.emptyList();
+                            submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
+                            for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
+                                for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
+                                    // Immediate do a get (not catching exception again) and then add the iterators we
+                                    // get back immediately. They'll be sorted as expected, since they're replacing the
+                                    // original one.
+                                    PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                                    iterators.add(iterator);
+                                }
+                            }
                         }
                     }
                 }
+                addConcatResultIterator(iterators, concatIterators);
             }
 
             success = true;
@@ -337,64 +542,80 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         }
     }
     
-    private void submitWork(final UUID scanId, List<KeyRange> splits,
-            List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) {
+    private static final class ScanLocation {
+    	private final int outerListIndex;
+    	private final int innerListIndex;
+    	private final Scan scan;
+    	public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) {
+    		this.outerListIndex = outerListIndex;
+    		this.innerListIndex = innerListIndex;
+    		this.scan = scan;
+    	}
+    	public int getOuterListIndex() {
+    		return outerListIndex;
+    	}
+    	public int getInnerListIndex() {
+    		return innerListIndex;
+    	}
+    	public Scan getScan() {
+    		return scan;
+    	}
+    }
+    private void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+            List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) {
         final ConnectionQueryServices services = context.getConnection().getQueryServices();
         ExecutorService executor = services.getExecutor();
-        for (final KeyRange split : splits) {
-            final Scan splitScan = ScanUtil.newScan(context.getScan());
-            // Intersect with existing start/stop key if the table is salted
-            // If not salted, we've already intersected it. If salted, we need
-            // to wait until now to intersect, as we're running parallel scans
-            // on all the possible regions here.
-            if (tableRef.getTable().getBucketNum() != null) {
-                KeyRange minMaxRange = context.getMinMaxRange();
-                if (minMaxRange != null) {
-                    // Add salt byte based on current split, as minMaxRange won't have it
-                    minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
-                    // FIXME: seems like this should be possible when we set the scan start/stop
-                    // in StatementContext.setScanRanges(). If it doesn't intersect the range for
-                    // one salt byte, I don't see how it could intersect it with any of them.
-                    if (!ScanUtil.intersectScanRange(splitScan, minMaxRange.getLowerRange(), minMaxRange.getUpperRange())) {
-                        continue; // Skip this chunk if no intersection based on minMaxRange
-                    }
-                }
+        // Pre-populate nestedFutures lists so that we can shuffle the scans
+        // and add the future to the right nested list. By shuffling the scans
+        // we get better utilization of the cluster since our thread executor
+        // will spray the scans across machines as opposed to targeting a
+        // single one since the scans are in row key order.
+        List<ScanLocation> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
+        for (int i = 0; i < nestedScans.size(); i++) {
+            List<Scan> scans = nestedScans.get(i);
+            List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size());
+            nestedFutures.add(futures);
+            for (int j = 0; j < scans.size(); j++) {
+            	Scan scan = nestedScans.get(i).get(j);
+                scanLocations.add(new ScanLocation(scan, i, j));
+                futures.add(null); // placeholder
             }
-            if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
-                // Delay the swapping of start/stop row until row so we don't muck with the intersect logic
-                ScanUtil.swapStartStopRowIfReversed(splitScan);
-                Future<PeekingResultIterator> future =
-                    executor.submit(new JobCallable<PeekingResultIterator>() {
+        }
+        Collections.shuffle(scanLocations);
+        for (ScanLocation scanLocation : scanLocations) {
+            final Scan scan = scanLocation.getScan();
+            Future<PeekingResultIterator> future =
+                executor.submit(new JobCallable<PeekingResultIterator>() {
 
-                    @Override
-                    public PeekingResultIterator call() throws Exception {
-                        long startTime = System.currentTimeMillis();
-                        ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan);
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
-                        }
-                        return iteratorFactory.newIterator(context, scanner, splitScan);
+                @Override
+                public PeekingResultIterator call() throws Exception {
+                    long startTime = System.currentTimeMillis();
+                    ResultIterator scanner = new TableResultIterator(context, tableRef, scan);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan);
                     }
+                    return iteratorFactory.newIterator(context, scanner, scan);
+                }
 
-                    /**
-                     * Defines the grouping for round robin behavior.  All threads spawned to process
-                     * this scan will be grouped together and time sliced with other simultaneously
-                     * executing parallel scans.
-                     */
-                    @Override
-                    public Object getJobId() {
-                        return ParallelIterators.this;
-                    }
-                });
-                futures.add(new Pair<KeyRange,Future<PeekingResultIterator>>(split,future));
-            }
+                /**
+                 * Defines the grouping for round robin behavior.  All threads spawned to process
+                 * this scan will be grouped together and time sliced with other simultaneously
+                 * executing parallel scans.
+                 */
+                @Override
+                public Object getJobId() {
+                    return ParallelIterators.this;
+                }
+            });
+            // Add our future in the right place so that we can concatenate the
+            // results of the inner futures versus merge sorting across all of them.
+            nestedFutures.get(scanLocation.getOuterListIndex()).set(scanLocation.getInnerListIndex(), new Pair<Scan,Future<PeekingResultIterator>>(scan,future));
         }
-
     }
 
     @Override
     public int size() {
-        return this.splits.size();
+        return this.scans.size();
     }
 
     @Override
@@ -403,4 +624,9 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         buf.append("CLIENT PARALLEL " + size() + "-WAY ");
         explain(buf.toString(),planSteps);
     }
+
+	@Override
+	public String toString() {
+		return "ParallelIterators [scans=" + scans + "]";
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
deleted file mode 100644
index 81f5af6..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.iterate;
-
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SaltingUtil;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-
-/**
- * Split the region according to the information contained in the scan's SkipScanFilter.
- */
-public class SkipRangeParallelIteratorRegionSplitter extends DefaultParallelIteratorRegionSplitter {
-
-    public static SkipRangeParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) {
-        return new SkipRangeParallelIteratorRegionSplitter(context, table, hintNode);
-    }
-
-    protected SkipRangeParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) {
-        super(context, table, hintNode);
-    }
-
-    @Override
-    protected List<HRegionLocation> getAllRegions() throws SQLException {
-        List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes());
-        return filterRegions(allTableRegions, context.getScanRanges());
-    }
-
-    public List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, final ScanRanges ranges) {
-        Iterable<HRegionLocation> regions;
-        if (ranges == ScanRanges.EVERYTHING) {
-            return allTableRegions;
-        } else if (ranges == ScanRanges.NOTHING) { // TODO: why not emptyList?
-            return Lists.<HRegionLocation>newArrayList();
-        } else {
-            regions = Iterables.filter(allTableRegions,
-                    new Predicate<HRegionLocation>() {
-                    @Override
-                    public boolean apply(HRegionLocation region) {
-                        KeyRange minMaxRange = context.getMinMaxRange();
-                        if (minMaxRange != null) {
-                            KeyRange range = KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
-                            if (table.getBucketNum() != null) {
-                                // Add salt byte, as minMaxRange won't have it
-                                minMaxRange = SaltingUtil.addSaltByte(region.getRegionInfo().getStartKey(), minMaxRange);
-                            }
-                            range = range.intersect(minMaxRange);
-                            return ranges.intersect(range.getLowerRange(), range.getUpperRange());
-                        }
-                        return ranges.intersect(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
-                    }
-            });
-        }
-        return Lists.newArrayList(regions);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 2840dca..e396b22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -426,6 +426,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public boolean isDegenerate() {
                     return false;
                 }
+
+                @Override
+                public boolean isRowKeyOrdered() {
+                    return true;
+                }
                 
             };
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
index 68f786a..afcc741 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
@@ -210,6 +210,10 @@ public class KeyRange implements Writable {
         return compareLowerToUpperBound(b,o,l,true);
     }
 
+    public int compareLowerToUpperBound( byte[] b) {
+        return compareLowerToUpperBound(b,0,b.length);
+    }
+
     /**
      * Compares a lower bound against an upper bound
      * @param b upper bound byte array
@@ -237,6 +241,10 @@ public class KeyRange implements Writable {
         return 1;
     }
     
+    public int compareUpperToLowerBound(byte[] b) {
+        return compareUpperToLowerBound(b,0,b.length);
+    }
+    
     public int compareUpperToLowerBound(byte[] b, int o, int l) {
         return compareUpperToLowerBound(b,o,l, true);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java
deleted file mode 100644
index df55fb5..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.query;
-
-import java.sql.SQLException;
-
-import org.apache.phoenix.schema.TableRef;
-
-
-/**
- * 
- * Interface for managing and caching table statistics.
- * The frequency of updating the table statistics are controlled
- * by {@link org.apache.phoenix.query.QueryServices#STATS_UPDATE_FREQ_MS_ATTRIB}.
- * Table stats may also be manually updated through {@link #updateStats(TableRef)}.
- * 
- *
- * 
- * @since 0.1
- */
-public interface StatsManager {
-    /**
-     * Get the minimum key for the given table
-     * @param table the table
-     * @return the minimum key or null if unknown
-     */
-    byte[] getMinKey(TableRef table);
-    
-    /**
-     * Get the maximum key for the given table
-     * @param table the table
-     * @return the maximum key or null if unknown
-     */
-    byte[] getMaxKey(TableRef table);
-    
-    /**
-     * Manually update the cached table statistics
-     * @param table the table
-     * @throws SQLException
-     */
-    void updateStats(TableRef table) throws SQLException;
-    
-    void clearStats() throws SQLException;
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
deleted file mode 100644
index 1ab7df5..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.query;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TimeKeeper;
-
-
-/**
- * 
- * Implementation of StatsManager. Table stats are updated asynchronously when they're
- * accessed and past time-to-live. In this case, future calls (after the asynchronous
- * call has completed), will have the updated stats.
- * 
- * All tables share the same HBase connection for a given connection and each connection
- * will have it's own cache for these stats. This isn't ideal and will get reworked when
- * the schema is kept on the server side. It's ok for now because:
- * 1) we only ask the server for these stats when the start/end region is queried against
- * 2) the query to get the stats pulls a single row so it's very cheap
- * 3) it's async and if it takes too long it won't lead to anything except less optimal
- *  parallelization.
- *
- * 
- * @since 0.1
- */
-public class StatsManagerImpl implements StatsManager {
-    private final ConnectionQueryServices services;
-    private final int statsUpdateFrequencyMs;
-    private final int maxStatsAgeMs;
-    private final TimeKeeper timeKeeper;
-    private final ConcurrentMap<String,PTableStats> tableStatsMap = new ConcurrentHashMap<String,PTableStats>();
-
-    public StatsManagerImpl(ConnectionQueryServices services, int statsUpdateFrequencyMs, int maxStatsAgeMs) {
-        this(services, statsUpdateFrequencyMs, maxStatsAgeMs, TimeKeeper.SYSTEM);
-    }
-    
-    public StatsManagerImpl(ConnectionQueryServices services, int statsUpdateFrequencyMs, int maxStatsAgeMs, TimeKeeper timeKeeper) {
-        this.services = services;
-        this.statsUpdateFrequencyMs = statsUpdateFrequencyMs;
-        this.maxStatsAgeMs = maxStatsAgeMs;
-        this.timeKeeper = timeKeeper;
-    }
-    
-    public long getStatsUpdateFrequency() {
-        return statsUpdateFrequencyMs;
-    }
-    
-    @Override
-    public void updateStats(TableRef tableRef) throws SQLException {
-        SQLException sqlE = null;
-        HTableInterface hTable = services.getTable(tableRef.getTable().getPhysicalName().getBytes());
-        try {
-            byte[] minKey = null, maxKey = null;
-            // Do a key-only scan to get the first row of a table. This is the min
-            // key for the table.
-            Scan scan = new Scan(HConstants.EMPTY_START_ROW, new KeyOnlyFilter());
-            ResultScanner scanner = hTable.getScanner(scan);
-            try {
-                Result r = scanner.next(); 
-                if (r != null) {
-                    minKey = r.getRow();
-                }
-            } finally {
-                scanner.close();
-            }
-            int maxPossibleKeyLength = SchemaUtil.estimateKeyLength(tableRef.getTable());
-            byte[] maxPossibleKey = new byte[maxPossibleKeyLength];
-            Arrays.fill(maxPossibleKey, (byte)255);
-            // Use this deprecated method to get the key "before" the max possible key value,
-            // which is the max key for a table.
-            @SuppressWarnings("deprecation")
-            Result r = hTable.getRowOrBefore(maxPossibleKey, tableRef.getTable().getColumnFamilies().iterator().next().getName().getBytes());
-            if (r != null) {
-                maxKey = r.getRow();
-            }
-            tableStatsMap.put(tableRef.getTable().getName().getString(), new PTableStats(timeKeeper.getCurrentTime(),minKey,maxKey));
-        } catch (IOException e) {
-            sqlE = ServerUtil.parseServerException(e);
-        } finally {
-            try {
-                hTable.close();
-            } catch (IOException e) {
-                if (sqlE == null) {
-                    sqlE = ServerUtil.parseServerException(e);
-                } else {
-                    sqlE.setNextException(ServerUtil.parseServerException(e));
-                }
-            } finally {
-                if (sqlE != null) {
-                    throw sqlE;
-                }
-            }
-        }
-    }
-    
-    private PTableStats getStats(final TableRef table) {
-        PTableStats stats = tableStatsMap.get(table);
-        if (stats == null) {
-            PTableStats newStats = new PTableStats();
-            stats = tableStatsMap.putIfAbsent(table.getTable().getName().getString(), newStats);
-            stats = stats == null ? newStats : stats;
-        }
-        // Synchronize on the current stats for a table to prevent
-        // multiple attempts to update the stats.
-        synchronized (stats) {
-            long initiatedTime = stats.getInitiatedTime();
-            long currentTime = timeKeeper.getCurrentTime();
-            // Update stats asynchronously if they haven't been updated within the specified frequency.
-            // We update asynchronously because we don't ever want to block the caller - instead we'll continue
-            // to use the old one.
-            if ( currentTime - initiatedTime >= getStatsUpdateFrequency()) {
-                stats.setInitiatedTime(currentTime);
-                services.getExecutor().submit(new Callable<Void>() {
-
-                    @Override
-                    public Void call() throws Exception { // TODO: will exceptions be logged?
-                        updateStats(table);
-                        return null;
-                    }
-                    
-                });
-            }
-            // If the stats are older than the max age, use an empty stats
-            if (currentTime - stats.getCompletedTime() >= maxStatsAgeMs) {
-                return PTableStats.NO_STATS;
-            }
-        }
-        return stats;
-    }
-    
-    @Override
-    public byte[] getMinKey(TableRef table) {
-        PTableStats stats = getStats(table);
-        return stats.getMinKey();
-    }
-
-    @Override
-    public byte[] getMaxKey(TableRef table) {
-        PTableStats stats = getStats(table);
-        return stats.getMaxKey();
-    }
-
-    private static class PTableStats {
-        private static final PTableStats NO_STATS = new PTableStats();
-        private long initiatedTime;
-        private final long completedTime;
-        private final byte[] minKey;
-        private final byte[] maxKey;
-        
-        public PTableStats() {
-            this(-1,null,null);
-        }
-        public PTableStats(long completedTime, byte[] minKey, byte[] maxKey) {
-            this.minKey = minKey;
-            this.maxKey = maxKey;
-            this.completedTime = this.initiatedTime = completedTime;
-        }
-
-        private byte[] getMinKey() {
-            return minKey;
-        }
-
-        private byte[] getMaxKey() {
-            return maxKey;
-        }
-
-        private long getCompletedTime() {
-            return completedTime;
-        }
-
-        private void setInitiatedTime(long initiatedTime) {
-            this.initiatedTime = initiatedTime;
-        }
-
-        private long getInitiatedTime() {
-            return initiatedTime;
-        }
-    }
-    
-    @Override
-    public void clearStats() throws SQLException {
-        tableStatsMap.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
index f9347a4..536e5bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
@@ -54,8 +55,7 @@ public class StatisticsCollector {
     private Map<String, byte[]> minMap = Maps.newHashMap();
     private Map<String, byte[]> maxMap = Maps.newHashMap();
     private long guidepostDepth;
-    private long byteCount = 0;
-    private Map<String, List<byte[]>> guidePostsMap = Maps.newHashMap();
+    private Map<String, Pair<Integer,List<byte[]>>> guidePostsMap = Maps.newHashMap();
     private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
     protected StatisticsTable statsTable;
     private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
@@ -131,7 +131,6 @@ public class StatisticsCollector {
         List<KeyValue> results = new ArrayList<KeyValue>();
         boolean hasMore = true;
         while (hasMore) {
-            // Am getting duplicates here. Need to avoid that
             hasMore = scanner.next(results);
             collectStatistics(results);
             count += results.size();
@@ -280,18 +279,21 @@ public class StatisticsCollector {
                 maxMap.put(fam, row);
             }
         }
-        byteCount += kv.getLength();
         // TODO : This can be moved to an interface so that we could collect guide posts in different ways
+        Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
+        if (gps == null) {
+            gps = new Pair<Integer,List<byte[]>>(0, Lists.<byte[]>newArrayList());
+            guidePostsMap.put(fam, gps);
+        }
+        int byteCount = gps.getFirst() + kv.getLength();
+        gps.setFirst(byteCount);
         if (byteCount >= guidepostDepth) {
-            if (guidePostsMap.get(fam) != null) {
-                guidePostsMap.get(fam).add(row);
-            } else {
-                List<byte[]> guidePosts = new ArrayList<byte[]>();
-                guidePosts.add(row);
-                guidePostsMap.put(fam, guidePosts);
+            // Prevent dups
+            List<byte[]> gpsKeys = gps.getSecond();
+            if (gpsKeys.isEmpty() || Bytes.compareTo(row, gpsKeys.get(gpsKeys.size()-1)) > 0) {
+                gpsKeys.add(row);
+                gps.setFirst(0); // Only reset count when adding guidepost
             }
-            // reset the count for the next key
-            byteCount = 0;
         }
     }
 
@@ -307,16 +309,19 @@ public class StatisticsCollector {
 
     public byte[] getGuidePosts(String fam) {
         if (!guidePostsMap.isEmpty()) {
-            List<byte[]> guidePosts = guidePostsMap.get(fam);
-            if (guidePosts != null) {
-                byte[][] array = new byte[guidePosts.size()][];
-                int i = 0;
-                for (byte[] element : guidePosts) {
-                    array[i] = element;
-                    i++;
+            Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
+            if (gps != null) {
+                List<byte[]> guidePosts = gps.getSecond();
+                if (!guidePosts.isEmpty()) {
+                    byte[][] array = new byte[guidePosts.size()][];
+                    int i = 0;
+                    for (byte[] element : guidePosts) {
+                        array[i] = element;
+                        i++;
+                    }
+                    PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array);
+                    return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray);
                 }
-                PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array);
-                return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray);
             }
         }
         return null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
index 2ea8a13..5561002 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ByteUtil;
 
 /**
  * Wrapper to access the statistics table SYSTEM.STATS using the HTable.
@@ -56,14 +57,12 @@ public class StatisticsTable implements Closeable {
         if (table == null) {
             // Map the statics table and the table with which the statistics is
             // associated. This is a workaround
-            HTablePool pool = new HTablePool(conf, 1);
-            try {
-                HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
-                table = new StatisticsTable(hTable);
-                tableMap.put(primaryTableName, table);
-            } finally {
-                pool.close();
-            }
+            HTablePool pool = new HTablePool(conf,100);
+            //HTable hTable = new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+            HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+            //h.setAutoFlushTo(true);
+            table = new StatisticsTable(hTable);
+            tableMap.put(primaryTableName, table);
         }
         return table;
     }
@@ -132,6 +131,9 @@ public class StatisticsTable implements Closeable {
                 currentTime, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
                 currentTime, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
+        // Add our empty column value so queries behave correctly
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
+                currentTime, ByteUtil.EMPTY_BYTE_ARRAY);
         mutations.add(put);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 0d69c8b..c0da0bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -41,7 +41,6 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
 
 import com.google.common.collect.Lists;
@@ -55,8 +54,8 @@ import com.google.common.collect.Lists;
  * @since 0.1
  */
 public class ScanUtil {
-
     public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1];
+    private static final byte[] ZERO_BYTE_ARRAY = new byte[1024];
 
     private ScanUtil() {
     }
@@ -210,7 +209,7 @@ public class ScanUtil {
 
     private static byte[] getKey(RowKeySchema schema, List<List<KeyRange>> slots, int[] slotSpan, Bound bound) {
         if (slots.isEmpty()) {
-            return null;
+            return KeyRange.UNBOUND;
         }
         int[] position = new int[slots.size()];
         int maxLength = 0;
@@ -222,7 +221,7 @@ public class ScanUtil {
         byte[] key = new byte[maxLength];
         int length = setKey(schema, slots, slotSpan, position, bound, key, 0, 0, position.length);
         if (length == 0) {
-            return null;
+            return KeyRange.UNBOUND;
         }
         if (length == maxLength) {
             return key;
@@ -385,9 +384,35 @@ public class ScanUtil {
         return keyRanges;
     }
 
-    public static byte[] nextKey(byte[] key, PTable table, ImmutableBytesWritable ptr) {
+    /**
+     * Converts a partially qualified KeyRange into a KeyRange with a
+     * inclusive lower bound and an exclusive upper bound, widening
+     * as necessary.
+     */
+    public static KeyRange convertToInclusiveExclusiveRange (KeyRange partialRange, RowKeySchema schema, ImmutableBytesWritable ptr) {
+        // Ensure minMaxRange is lower inclusive and upper exclusive, as that's
+        // what we need to intersect against for the HBase scan.
+        byte[] lowerRange = partialRange.getLowerRange();
+        if (!partialRange.lowerUnbound()) {
+            if (!partialRange.isLowerInclusive()) {
+                lowerRange = ScanUtil.nextKey(lowerRange, schema, ptr);
+            }
+        }
+        
+        byte[] upperRange = partialRange.getUpperRange();
+        if (!partialRange.upperUnbound()) {
+            if (partialRange.isUpperInclusive()) {
+                upperRange = ScanUtil.nextKey(upperRange, schema, ptr);
+            }
+        }
+        if (partialRange.getLowerRange() != lowerRange || partialRange.getUpperRange() != upperRange) {
+            partialRange = KeyRange.getKeyRange(lowerRange, upperRange);
+        }
+        return partialRange;
+    }
+    
+    private static byte[] nextKey(byte[] key, RowKeySchema schema, ImmutableBytesWritable ptr) {
         int pos = 0;
-        RowKeySchema schema = table.getRowKeySchema();
         int maxOffset = schema.iterator(key, ptr);
         while (schema.next(ptr, pos, maxOffset) != null) {
             pos++;
@@ -425,7 +450,7 @@ public class ScanUtil {
         byte[] reversed = scan.getAttribute(REVERSED_ATTR);
         return (PDataType.TRUE_BYTES.equals(reversed));
     }
-
+        
     public static int[] getDefaultSlotSpans(int nSlots) {
         return new int[nSlots];
     }
@@ -467,4 +492,31 @@ public class ScanUtil {
     public static boolean isAnalyzeTable(Scan scan) {
         return scan.getAttribute((BaseScannerRegionObserver.ANALYZE_TABLE)) != null;
     }
+
+    public static boolean crossesPrefixBoundary(byte[] key, byte[] prefixBytes, int prefixLength) {
+        if (key.length < prefixLength) {
+            return true;
+        }
+        if (prefixBytes.length >= prefixLength) {
+            return Bytes.compareTo(prefixBytes, 0, prefixLength, key, 0, prefixLength) != 0;
+        }
+        return hasNonZeroLeadingBytes(key, prefixLength);
+    }
+
+    public static byte[] getPrefix(byte[] startKey, int prefixLength) {
+        // If startKey is at beginning, then our prefix will be a null padded byte array
+        return startKey.length >= prefixLength ? startKey : ByteUtil.EMPTY_BYTE_ARRAY;
+    }
+
+    private static boolean hasNonZeroLeadingBytes(byte[] key, int nBytesToCheck) {
+        if (nBytesToCheck > ZERO_BYTE_ARRAY.length) {
+            do {
+                if (Bytes.compareTo(key, nBytesToCheck - ZERO_BYTE_ARRAY.length, ZERO_BYTE_ARRAY.length, ScanUtil.ZERO_BYTE_ARRAY, 0, ScanUtil.ZERO_BYTE_ARRAY.length) != 0) {
+                    return true;
+                }
+                nBytesToCheck -= ZERO_BYTE_ARRAY.length;
+            } while (nBytesToCheck > ZERO_BYTE_ARRAY.length);
+        }
+        return Bytes.compareTo(key, 0, nBytesToCheck, ZERO_BYTE_ARRAY, 0, nBytesToCheck) != 0;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java
new file mode 100644
index 0000000..be90399
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesIntersectTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.ScanUtil;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ScanRangesIntersectTest {
+
+    @Test
+    public void testPointLookupIntersect() throws Exception {
+        RowKeySchema schema = schema();
+        int[] slotSpan = ScanUtil.SINGLE_COLUMN_SLOT_SPAN;
+        List<KeyRange> keys = points("a","j","m","z");
+        ScanRanges ranges = ScanRanges.create(schema, Collections.singletonList(keys), slotSpan);
+        assertIntersect(ranges, "b", "l", "j");
+        
+    }
+    
+    private static void assertIntersect(ScanRanges ranges, String lowerRange, String upperRange, String... expectedPoints) {
+        List<KeyRange> expectedKeys = points(expectedPoints);
+        Collections.sort(expectedKeys,KeyRange.COMPARATOR);
+        Scan scan = new Scan();
+        scan.setFilter(ranges.getSkipScanFilter());
+        byte[] startKey = lowerRange == null ? KeyRange.UNBOUND : PDataType.VARCHAR.toBytes(lowerRange);
+        byte[] stopKey = upperRange == null ? KeyRange.UNBOUND : PDataType.VARCHAR.toBytes(upperRange);
+        Scan newScan = ranges.intersectScan(scan, startKey, stopKey, 0);
+        if (expectedPoints.length == 0) {
+            assertNull(newScan);
+        } else {
+            assertNotNull(newScan);
+            SkipScanFilter filter = (SkipScanFilter)newScan.getFilter();
+            assertEquals(expectedKeys, filter.getSlots().get(0));
+        }
+    }
+    
+    private static List<KeyRange> points(String... points) {
+        List<KeyRange> keys = Lists.newArrayListWithExpectedSize(points.length);
+        for (String point : points) {
+            keys.add(KeyRange.getKeyRange(PDataType.VARCHAR.toBytes(point)));
+        }
+        return keys;
+    }
+    
+    private static RowKeySchema schema() {
+        RowKeySchemaBuilder builder = new RowKeySchemaBuilder(1);
+        builder.addField(new PDatum() {
+            @Override
+            public boolean isNullable() {
+                return false;
+            }
+            @Override
+            public PDataType getDataType() {
+                return PDataType.VARCHAR;
+            }
+            @Override
+            public Integer getMaxLength() {
+                return null;
+            }
+            @Override
+            public Integer getScale() {
+                return null;
+            }
+            @Override
+            public SortOrder getSortOrder() {
+                return SortOrder.getDefault();
+            }
+        }, false, SortOrder.getDefault());
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
index cd88ce7..695c4c9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanRangesTest.java
@@ -72,7 +72,7 @@ public class ScanRangesTest {
             // incrementing the key too much.
             upperExclusiveKey = ByteUtil.nextKey(upperExclusiveKey);
         }
-        assertEquals(expectedResult, scanRanges.intersect(lowerInclusiveKey,upperExclusiveKey));
+        assertEquals(expectedResult, scanRanges.intersects(lowerInclusiveKey,upperExclusiveKey,0));
     }
 
     @Parameters(name="{0} {2}")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
index 3c0a952..063728c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
@@ -165,17 +165,22 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest {
                               KeyRange.getKeyRange(startKey2)));
         if (Bytes.compareTo(startKey1, startKey2) > 0) {
             expectedStartKey = startKey2;
-            expectedEndKey = ByteUtil.concat(startKey1, QueryConstants.SEPARATOR_BYTE_ARRAY);
+            expectedEndKey = startKey1;
             Collections.reverse(expectedRanges.get(0));
         } else {
             expectedStartKey = startKey1;
-            expectedEndKey = ByteUtil.concat(startKey2, QueryConstants.SEPARATOR_BYTE_ARRAY);;
+            expectedEndKey = startKey2;
         }
-        assertTrue(Bytes.compareTo(expectedStartKey, startKey) == 0);
-        assertTrue(Bytes.compareTo(expectedEndKey, stopKey) == 0);
+        assertEquals(0,startKey.length);
+        assertEquals(0,stopKey.length);
 
         assertNotNull(filter);
         assertTrue(filter instanceof SkipScanFilter);
+        SkipScanFilter skipScanFilter = (SkipScanFilter)filter;
+        assertEquals(1,skipScanFilter.getSlots().size());
+        assertEquals(2,skipScanFilter.getSlots().get(0).size());
+        assertArrayEquals(expectedStartKey, skipScanFilter.getSlots().get(0).get(0).getLowerRange());
+        assertArrayEquals(expectedEndKey, skipScanFilter.getSlots().get(0).get(1).getLowerRange());
         StatementContext context = plan.getContext();
         ScanRanges scanRanges = context.getScanRanges();
         List<List<KeyRange>> ranges = scanRanges.getRanges();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
index bd19663..032768b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
@@ -1185,9 +1185,8 @@ public class WhereOptimizerTest extends BaseConnectionlessQueryTest {
         StatementContext context = compileStatement(query, binds);
         Scan scan = context.getScan();
         Filter filter = scan.getFilter();
-        assertNotNull(filter);
-        assertTrue(filter instanceof SkipScanFilter);
-        byte[] expectedStartRow = ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId));
+        assertNull(filter);
+        byte[] expectedStartRow = ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId2));
         byte[] expectedStopRow = ByteUtil.concat(ByteUtil.concat(PDataType.VARCHAR.toBytes(tenantId), PDataType.VARCHAR.toBytes(entityId2)), QueryConstants.SEPARATOR_BYTE_ARRAY);
         assertArrayEquals(expectedStartRow, scan.getStartRow());
         assertArrayEquals(expectedStopRow, scan.getStopRow());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
index ff31f7c..8ac322f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
@@ -27,6 +27,7 @@ import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.MULTI_CF_NAME;
 import static org.apache.phoenix.util.TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+import static org.apache.phoenix.util.TestUtil.PTSDB3_NAME;
 import static org.apache.phoenix.util.TestUtil.PTSDB_NAME;
 import static org.apache.phoenix.util.TestUtil.TABLE_WITH_ARRAY;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -103,6 +104,7 @@ public class BaseConnectionlessQueryTest extends BaseTest {
         ensureTableCreated(getUrl(), ENTITY_HISTORY_TABLE_NAME);
         ensureTableCreated(getUrl(), FUNKY_NAME);
         ensureTableCreated(getUrl(), PTSDB_NAME);
+        ensureTableCreated(getUrl(), PTSDB3_NAME);
         ensureTableCreated(getUrl(), MULTI_CF_NAME);
         ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME);
         ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME);
@@ -110,7 +112,6 @@ public class BaseConnectionlessQueryTest extends BaseTest {
         ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME);
         ensureTableCreated(getUrl(), TABLE_WITH_ARRAY);
         Properties props = new Properties();
-        //props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_TABLE_TIMESTAMP));
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP));
         PhoenixConnection conn = DriverManager.getConnection(PHOENIX_CONNECTIONLESS_JDBC_URL, props).unwrap(PhoenixConnection.class);
         try {


[4/6] git commit: PHOENIX-1251 Salted queries with range scan become full table scans

Posted by ja...@apache.org.
PHOENIX-1251 Salted queries with range scan become full table scans

Conflicts:
	phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
	phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java

Conflicts:
	phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
	phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
	phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
	phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
	phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
	phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
	phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
	phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
	phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
	phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorRegionSplitterFactory.java
	phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
	phoenix-core/src/main/java/org/apache/phoenix/query/StatsManagerImpl.java
	phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
	phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
	phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/88c6abb0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/88c6abb0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/88c6abb0

Branch: refs/heads/3.0
Commit: 88c6abb038d83a261be4a7fdc5388a20a8513a23
Parents: ff47a95
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Oct 1 08:49:04 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Oct 3 18:20:54 2014 -0700

----------------------------------------------------------------------
 .../BaseParallelIteratorsRegionSplitterIT.java  |  90 ----
 .../end2end/BaseTenantSpecificViewIndexIT.java  |   7 +-
 .../org/apache/phoenix/end2end/BaseViewIT.java  |   5 +-
 ...efaultParallelIteratorsRegionSplitterIT.java | 163 -------
 .../org/apache/phoenix/end2end/DeleteIT.java    |   1 +
 .../phoenix/end2end/GuidePostsLifeCycleIT.java  | 168 -------
 .../org/apache/phoenix/end2end/InListIT.java    |   5 +-
 .../org/apache/phoenix/end2end/KeyOnlyIT.java   |  57 +--
 .../phoenix/end2end/MultiCfQueryExecIT.java     |  73 +--
 .../phoenix/end2end/ParallelIteratorsIT.java    | 172 +++++++
 .../org/apache/phoenix/end2end/QueryPlanIT.java | 202 --------
 ...ipRangeParallelIteratorRegionSplitterIT.java | 109 ++++-
 .../end2end/SkipScanAfterManualSplitIT.java     |  30 +-
 .../apache/phoenix/end2end/StatsManagerIT.java  | 198 --------
 .../end2end/TenantSpecificTablesDMLIT.java      |  55 +--
 .../phoenix/end2end/index/SaltedIndexIT.java    |   4 +-
 .../apache/phoenix/cache/ServerCacheClient.java |  12 +-
 .../org/apache/phoenix/compile/QueryPlan.java   |   2 +
 .../org/apache/phoenix/compile/ScanRanges.java  | 370 ++++++++++++---
 .../phoenix/compile/StatementContext.java       |  46 +-
 .../apache/phoenix/compile/WhereOptimizer.java  |  39 +-
 .../coprocessor/MetaDataEndpointImpl.java       |  85 ++--
 .../apache/phoenix/execute/AggregatePlan.java   |   4 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   | 216 +++++++++
 .../apache/phoenix/execute/BasicQueryPlan.java  | 211 ---------
 .../phoenix/execute/DegenerateQueryPlan.java    |   2 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |  13 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |   6 +-
 .../apache/phoenix/filter/SkipScanFilter.java   |  29 +-
 .../phoenix/index/PhoenixIndexBuilder.java      |   1 -
 .../DefaultParallelIteratorRegionSplitter.java  | 174 -------
 .../apache/phoenix/iterate/ExplainTable.java    |   9 +-
 .../ParallelIteratorRegionSplitterFactory.java  |  38 --
 .../phoenix/iterate/ParallelIterators.java      | 456 ++++++++++++++-----
 ...SkipRangeParallelIteratorRegionSplitter.java |  83 ----
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   5 +
 .../java/org/apache/phoenix/query/KeyRange.java |   8 +
 .../org/apache/phoenix/query/StatsManager.java  |  59 ---
 .../apache/phoenix/query/StatsManagerImpl.java  | 214 ---------
 .../schema/stat/StatisticsCollector.java        |  47 +-
 .../phoenix/schema/stat/StatisticsTable.java    |  18 +-
 .../java/org/apache/phoenix/util/ScanUtil.java  |  66 ++-
 .../compile/ScanRangesIntersectTest.java        | 105 +++++
 .../apache/phoenix/compile/ScanRangesTest.java  |   2 +-
 .../phoenix/compile/WhereCompilerTest.java      |  13 +-
 .../phoenix/compile/WhereOptimizerTest.java     |   5 +-
 .../query/BaseConnectionlessQueryTest.java      |   3 +-
 .../org/apache/phoenix/query/QueryPlanTest.java | 179 ++++++++
 .../java/org/apache/phoenix/util/TestUtil.java  |  41 ++
 .../phoenix/pig/hadoop/PhoenixInputFormat.java  |  25 +-
 50 files changed, 1749 insertions(+), 2176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseParallelIteratorsRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseParallelIteratorsRegionSplitterIT.java
deleted file mode 100644
index 514b36e..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseParallelIteratorsRegionSplitterIT.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.util.TestUtil.STABLE_NAME;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
-import org.junit.experimental.categories.Category;
-
-import com.google.common.collect.Maps;
-
-@Category(ClientManagedTimeTest.class)
-public class BaseParallelIteratorsRegionSplitterIT extends BaseClientManagedTimeIT {
-
-    protected static final byte[] KMIN  = new byte[] {'!'};
-    protected static final byte[] KMIN2  = new byte[] {'.'};
-    protected static final byte[] K1  = new byte[] {'a'};
-    protected static final byte[] K3  = new byte[] {'c'};
-    protected static final byte[] K4  = new byte[] {'d'};
-    protected static final byte[] K5  = new byte[] {'e'};
-    protected static final byte[] K6  = new byte[] {'f'};
-    protected static final byte[] K9  = new byte[] {'i'};
-    protected static final byte[] K11 = new byte[] {'k'};
-    protected static final byte[] K12 = new byte[] {'l'};
-    protected static final byte[] KMAX  = new byte[] {'~'};
-    protected static final byte[] KMAX2  = new byte[] {'z'};
-    
-    @BeforeClass
-    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        // Must update config before starting server
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-    
-    protected void initTableValues(long ts) throws Exception {
-        byte[][] splits = new byte[][] {K3,K4,K9,K11};
-        ensureTableCreated(getUrl(),STABLE_NAME,splits, ts-2);
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(url, props);
-        PreparedStatement stmt = conn.prepareStatement(
-                "upsert into " + STABLE_NAME + " VALUES (?, ?)");
-        stmt.setString(1, new String(KMIN));
-        stmt.setInt(2, 1);
-        stmt.execute();
-        stmt.setString(1, new String(KMAX));
-        stmt.setInt(2, 2);
-        stmt.execute();
-        conn.commit();
-        conn.close();
-    }
-
-    protected static TableRef getTableRef(Connection conn, long ts) throws SQLException {
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable(
-                new PTableKey(pconn.getTenantId(), STABLE_NAME)), ts, false);
-        return table;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
index cda44c5..c6a6a7e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
@@ -102,10 +102,11 @@ public class BaseTenantSpecificViewIndexIT extends BaseHBaseManagedTimeIT {
         conn.createStatement().execute("UPSERT INTO v(k2,v1,v2) VALUES (-1, 'blah', 'superblah')"); // sanity check that we can upsert after index is there
         conn.commit();
         ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT k1, k2, v2 FROM v WHERE v2='" + valuePrefix + "v2-1'");
-        assertEquals(saltBuckets == null ? 
+        String expected = saltBuckets == null ? 
                 "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _IDX_T ['" + tenantId + "',-32768,'" + valuePrefix + "v2-1']" :
-                "CLIENT PARALLEL 3-WAY SKIP SCAN ON 3 KEYS OVER _IDX_T [0,'" + tenantId + "',-32768,'" + valuePrefix + "v2-1'] - [2,'" + tenantId + "',-32768,'" + valuePrefix + "v2-1']\n" + 
-                "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+                "CLIENT PARALLEL 3-WAY RANGE SCAN OVER _IDX_T [0,'" + tenantId + "',-32768,'" + valuePrefix + "v2-1']\n" + 
+                "CLIENT MERGE SORT";
+        assertEquals(expected, QueryUtil.getExplainPlan(rs));
     }
     
     private Connection createTenantConnection(String tenantId) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
index 0a5c197..e68c82e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
@@ -110,9 +110,10 @@ public class BaseViewIT extends BaseHBaseManagedTimeIT {
         assertEquals("bar", rs.getString(4));
         assertFalse(rs.next());
         rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+
         assertEquals(saltBuckets == null
                 ? "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _IDX_T [" + Short.MIN_VALUE + ",51]"
-                : "CLIENT PARALLEL " + saltBuckets + "-WAY SKIP SCAN ON 3 KEYS OVER _IDX_T [0," + Short.MIN_VALUE + ",51] - [2," + Short.MIN_VALUE + ",51]\nCLIENT MERGE SORT",
+                : "CLIENT PARALLEL " + saltBuckets + "-WAY RANGE SCAN OVER _IDX_T [0," + Short.MIN_VALUE + ",51]\nCLIENT MERGE SORT",
             QueryUtil.getExplainPlan(rs));
 
         conn.createStatement().execute("CREATE INDEX i2 on v(s)");
@@ -126,7 +127,7 @@ public class BaseViewIT extends BaseHBaseManagedTimeIT {
         rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         assertEquals(saltBuckets == null
                 ? "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _IDX_T [" + (Short.MIN_VALUE+1) + ",'foo']"
-                : "CLIENT PARALLEL " + saltBuckets + "-WAY SKIP SCAN ON 3 KEYS OVER _IDX_T [0," + (Short.MIN_VALUE+1) + ",'foo'] - [2," + (Short.MIN_VALUE+1) + ",'foo']\nCLIENT MERGE SORT",
+                : "CLIENT PARALLEL " + saltBuckets + "-WAY RANGE SCAN OVER _IDX_T [0," + (Short.MIN_VALUE+1) + ",'foo']\nCLIENT MERGE SORT",
             QueryUtil.getExplainPlan(rs));
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
deleted file mode 100644
index e7a1044..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.google.common.collect.Maps;
-
-
-/**
- * Tests for {@link DefaultParallelIteratorRegionSplitter}.
- * 
- * 
- * @since 0.1
- */
-
-@Category(ClientManagedTimeTest.class)
-public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIteratorsRegionSplitterIT {
-    
-    @BeforeClass
-    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        // Must update config before starting server
-        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-    private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan)
-            throws SQLException {
-        TableRef tableRef = getTableRef(conn, ts);
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        final List<HRegionLocation> regions =  pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
-        PhoenixStatement statement = new PhoenixStatement(pconn);
-        StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement));
-        DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(), HintNode.EMPTY_HINT_NODE) {
-            @Override
-            protected List<HRegionLocation> getAllRegions() throws SQLException {
-                return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(), scan.getStopRow());
-            }
-        };
-        List<KeyRange> keyRanges = splitter.getSplits();
-        Collections.sort(keyRanges, new Comparator<KeyRange>() {
-            @Override
-            public int compare(KeyRange o1, KeyRange o2) {
-                return Bytes.compareTo(o1.getLowerRange(),o2.getLowerRange());
-            }
-        });
-        return keyRanges;
-    }
-
-    @Test
-    public void testGetSplits() throws Exception {
-        long ts = nextTimestamp();
-        initTableValues(ts);
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts + 2;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(url, props);
-        PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE");
-        stmt.execute();
-        Scan scan = new Scan();
-        
-        // number of regions > target query concurrency
-        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
-        scan.setStartRow(K1);
-        scan.setStopRow(K12);
-        List<KeyRange> keyRanges = getSplits(conn, ts, scan);
-        assertEquals("Unexpected number of splits: " + keyRanges, 7, keyRanges.size());
-        assertEquals(newKeyRange(KeyRange.UNBOUND, KMIN), keyRanges.get(0));
-        assertEquals(newKeyRange(KMIN, K3), keyRanges.get(1));
-        assertEquals(newKeyRange(K3, K4), keyRanges.get(2));
-        assertEquals(newKeyRange(K4, K9), keyRanges.get(3));
-        assertEquals(newKeyRange(K9, K11), keyRanges.get(4));
-        assertEquals(newKeyRange(K11, KMAX), keyRanges.get(5));
-        assertEquals(newKeyRange(KMAX,  KeyRange.UNBOUND), keyRanges.get(6));
-        
-        scan.setStartRow(K3);
-        scan.setStopRow(K6);
-        keyRanges = getSplits(conn, ts, scan);
-        assertEquals("Unexpected number of splits: " + keyRanges, 2, keyRanges.size());
-        // note that we get a single split from R2 due to small key space
-        assertEquals(newKeyRange(K3, K4), keyRanges.get(0));
-        assertEquals(newKeyRange(K4, K9), keyRanges.get(1));
-        
-        scan.setStartRow(K5);
-        scan.setStopRow(K6);
-        keyRanges = getSplits(conn, ts, scan);
-        assertEquals("Unexpected number of splits: " + keyRanges, 1, keyRanges.size());
-        assertEquals(newKeyRange(K4, K9), keyRanges.get(0));
-        conn.close();
-    }
-
-    @Test
-    public void testGetLowerUnboundSplits() throws Throwable {
-        long ts = nextTimestamp();
-        initTableValues(ts);
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(url, props);
-        PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE");
-        stmt.execute();
-        // The query would use all the split points here
-        conn.createStatement().executeQuery("SELECT * FROM STABLE");
-        conn.close();
-        Scan scan = new Scan();
-        scan.setStartRow(HConstants.EMPTY_START_ROW);
-        scan.setStopRow(K1);
-        List<KeyRange> keyRanges = getSplits(conn, ts, scan);
-        assertEquals("Unexpected number of splits: " + keyRanges, 2, keyRanges.size());
-        assertEquals(newKeyRange(KeyRange.UNBOUND, KMIN), keyRanges.get(0));
-        assertEquals(newKeyRange(KMIN, K3), keyRanges.get(1));
-    }
-
-    private static KeyRange newKeyRange(byte[] lowerRange, byte[] upperRange) {
-        return PDataType.CHAR.getKeyRange(lowerRange, true, upperRange, false);
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index 4d41141..a2dd942 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -235,6 +235,7 @@ public class DeleteIT extends BaseHBaseManagedTimeIT {
         testDeleteAllFromTableWithIndex(true, false);
     }
     
+    //@Ignore // TODO: JT to look at: SkipScanFilter:151 assert for skip_hint > current_key is failing 
     @Test
     public void testDeleteAllFromTableWithIndexNoAutoCommitNoSalting() throws SQLException {
         testDeleteAllFromTableWithIndex(false,false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
deleted file mode 100644
index ba9f961..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.util.TestUtil.STABLE_NAME;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.google.common.collect.Maps;
-
-@Category(HBaseManagedTimeTest.class)
-public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT {
-    
-    @BeforeClass
-    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        // Must update config before starting server
-        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
-        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20));
-        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-    
-    protected static final byte[] KMIN  = new byte[] {'!'};
-    protected static final byte[] KMIN2  = new byte[] {'.'};
-    protected static final byte[] K1  = new byte[] {'a'};
-    protected static final byte[] K3  = new byte[] {'c'};
-    protected static final byte[] K4  = new byte[] {'d'};
-    protected static final byte[] K5  = new byte[] {'e'};
-    protected static final byte[] K6  = new byte[] {'f'};
-    protected static final byte[] K9  = new byte[] {'i'};
-    protected static final byte[] K11 = new byte[] {'k'};
-    protected static final byte[] K12 = new byte[] {'l'};
-    protected static final byte[] KMAX  = new byte[] {'~'};
-    protected static final byte[] KMAX2  = new byte[] {'z'};
-    protected static final byte[] KR = new byte[] { 'r' };
-    protected static final byte[] KP = new byte[] { 'p' };
-
-    private static List<KeyRange> getSplits(Connection conn, final Scan scan) throws SQLException {
-        TableRef tableRef = getTableRef(conn);
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(
-                tableRef.getTable().getPhysicalName().getBytes());
-        PhoenixStatement statement = new PhoenixStatement(pconn);
-        StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement));
-        DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(),
-                HintNode.EMPTY_HINT_NODE) {
-            @Override
-            protected List<HRegionLocation> getAllRegions() throws SQLException {
-                return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(),
-                        scan.getStopRow());
-            }
-        };
-        List<KeyRange> keyRanges = splitter.getSplits();
-        Collections.sort(keyRanges, new Comparator<KeyRange>() {
-            @Override
-            public int compare(KeyRange o1, KeyRange o2) {
-                return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange());
-            }
-        });
-        return keyRanges;
-    }
-
-    // This test ensures that as we keep adding new records the splits gets updated
-    @Test
-    public void testGuidePostsLifeCycle() throws Exception {
-        byte[][] splits = new byte[][] { K3, K9, KR };
-        ensureTableCreated(getUrl(), STABLE_NAME, splits);
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(url, props);
-        PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE");
-        stmt.execute();
-        Scan scan = new Scan();
-        List<KeyRange> keyRanges = getSplits(conn, scan);
-        assertEquals(4, keyRanges.size());
-        upsert(new byte[][] { KMIN, K4, K11 });
-        stmt = conn.prepareStatement("ANALYZE STABLE");
-        stmt.execute();
-        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); 
-        keyRanges = getSplits(conn, scan);
-        assertEquals(7, keyRanges.size());
-        upsert(new byte[][] { KMIN2, K5, K12 });
-        stmt = conn.prepareStatement("ANALYZE STABLE");
-        stmt.execute();
-        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
-        keyRanges = getSplits(conn, scan);
-        assertEquals(10, keyRanges.size());
-        upsert(new byte[][] { K1, K6, KP });
-        stmt = conn.prepareStatement("ANALYZE STABLE");
-        stmt.execute();
-        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
-        keyRanges = getSplits(conn, scan);
-        assertEquals(13, keyRanges.size());
-        conn.close();
-    }
-
-    protected void upsert( byte[][] val) throws Exception {
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(url, props);
-        PreparedStatement stmt = conn.prepareStatement("upsert into " + STABLE_NAME + " VALUES (?, ?)");
-        stmt.setString(1, new String(val[0]));
-        stmt.setInt(2, 1);
-        stmt.execute();
-        stmt.setString(1, new String(val[1]));
-        stmt.setInt(2, 2);
-        stmt.execute();
-        stmt.setString(1, new String(val[2]));
-        stmt.setInt(2, 3);
-        stmt.execute();
-        conn.commit();
-        conn.close();
-    }
-
-    protected static TableRef getTableRef(Connection conn) throws SQLException {
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable(
-                new PTableKey(pconn.getTenantId(), STABLE_NAME)), System.currentTimeMillis(), false);
-        return table;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
index dc60b69..920891b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
@@ -165,10 +165,7 @@ public class InListIT extends BaseHBaseManagedTimeIT {
     private static final List<PDataType> INTEGER_TYPES = Arrays.asList(PDataType.INTEGER, PDataType.LONG);
     private static final List<Integer> SALT_BUCKET_NUMBERS = Arrays.asList(0, 4);
 
-    // we should be including the RANGE_SCAN hint here, but a bug with ParallelIterators causes tests to fail
-    // see the relevant JIRA here: https://issues.apache.org/jira/browse/PHOENIX-1251
-    private static final List<String> HINTS = Arrays.asList("", "/*+ SKIP_SCAN */");
-//    private static final List<String> HINTS = Arrays.asList("", "/*+ SKIP_SCAN */", "/*+ RANGE_SCAN */");
+    private static final List<String> HINTS = Arrays.asList("", "/*+ SKIP_SCAN */", "/*+ RANGE_SCAN */");
     
     /**
      * Tests the given where clause against the given upserts by comparing against the list of

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
index ed081d9..4dee5d8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
@@ -19,35 +19,22 @@ package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.TestUtil.KEYONLY_NAME;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.analyzeTable;
+import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -88,8 +75,7 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT {
         assertEquals(3, rs.getInt(1));
         assertEquals(4, rs.getInt(2));
         assertFalse(rs.next());
-        Scan scan = new Scan();
-        List<KeyRange> splits = getSplits(conn5, ts, scan);
+        List<KeyRange> splits = getAllSplits(conn5, "KEYONLY");
         assertEquals(3, splits.size());
         conn5.close();
         
@@ -180,41 +166,4 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT {
         conn.commit();
         conn.close();
     }
-
-    private void analyzeTable(Connection conn, String tableName) throws IOException, SQLException {
-        String query = "ANALYZE " + tableName;
-        conn.createStatement().execute(query);
-    }
-    
-    private static TableRef getTableRef(Connection conn, long ts) throws SQLException {
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable(
-                new PTableKey(pconn.getTenantId(), KEYONLY_NAME)), ts, false);
-        return table;
-    }
-
-    private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan) throws SQLException {
-        TableRef tableRef = getTableRef(conn, ts);
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(
-                tableRef.getTable().getPhysicalName().getBytes());
-        PhoenixStatement statement = new PhoenixStatement(pconn);
-        StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement));
-        DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(),
-                HintNode.EMPTY_HINT_NODE) {
-            @Override
-            protected List<HRegionLocation> getAllRegions() throws SQLException {
-                return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(),
-                        scan.getStopRow());
-            }
-        };
-        List<KeyRange> keyRanges = splitter.getSplits();
-        Collections.sort(keyRanges, new Comparator<KeyRange>() {
-            @Override
-            public int compare(KeyRange o1, KeyRange o2) {
-                return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange());
-            }
-        });
-        return keyRanges;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
index fbd1cf6..9f313ae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
@@ -18,36 +18,23 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.analyzeTable;
+import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -106,11 +93,6 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT {
         stmt.execute();
     }
 
-    private void analyzeTable(Connection conn, String tableName) throws IOException, SQLException {
-        String query = "ANALYZE " + tableName;
-        conn.createStatement().execute(query);
-    }
-
     @Test
     public void testConstantCount() throws Exception {
         long ts = nextTimestamp();
@@ -174,14 +156,14 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT {
     }
     
     @Test
-    public void testCFToDisambiguate1() throws Exception {
+    public void testGuidePostsForMultiCFs() throws Exception {
         long ts = nextTimestamp();
+        initTableValues(ts);
         String query = "SELECT F.RESPONSE_TIME,G.RESPONSE_TIME from multi_cf where F.RESPONSE_TIME = 2222";
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 5); // Run query at timestamp 5
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(url, props);
         try {
-            initTableValues(ts);
             analyzeTable(conn, "MULTI_CF");
             PreparedStatement statement = conn.prepareStatement(query);
             ResultSet rs = statement.executeQuery();
@@ -189,16 +171,13 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT {
             assertEquals(2222, rs.getLong(1));
             assertEquals(22222, rs.getLong(2));
             assertFalse(rs.next());
-            Scan scan = new Scan();
-            // See if F has splits in it
-            scan.addFamily(Bytes.toBytes("E"));
-            List<KeyRange> splits = getSplits(conn, ts, scan);
+            // Use E column family. Since the column family with the empty key value (the first one, A)
+            // is always added to the scan, we never really use other guideposts (but this may change).
+            List<KeyRange> splits = getAllSplits(conn, "MULTI_CF", "e.cpu_utilization IS NOT NULL");
+            // Since the E column family is not populated, it won't have as many splits
             assertEquals(3, splits.size());
-            scan = new Scan();
-            // See if G has splits in it
-            scan.addFamily(Bytes.toBytes("G"));
-            splits = getSplits(conn, ts, scan);
-            // We get splits from different CF
+            // Same as above for G column family.
+            splits = getAllSplits(conn, "MULTI_CF", "g.response_time IS NOT NULL");
             assertEquals(3, splits.size());
         } finally {
             conn.close();
@@ -283,36 +262,4 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT {
             conn.close();
         }
     }
-
-    private static TableRef getTableRef(Connection conn, long ts) throws SQLException {
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable(
-                new PTableKey(pconn.getTenantId(), "MULTI_CF")), ts, false);
-        return table;
-    }
-
-    private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan) throws SQLException {
-        TableRef tableRef = getTableRef(conn, ts);
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(
-                tableRef.getTable().getPhysicalName().getBytes());
-        PhoenixStatement statement = new PhoenixStatement(pconn);
-        StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement));
-        DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(),
-                HintNode.EMPTY_HINT_NODE) {
-            @Override
-            protected List<HRegionLocation> getAllRegions() throws SQLException {
-                return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(),
-                        scan.getStopRow());
-            }
-        };
-        List<KeyRange> keyRanges = splitter.getSplits();
-        Collections.sort(keyRanges, new Comparator<KeyRange>() {
-            @Override
-            public int compare(KeyRange o1, KeyRange o2) {
-                return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange());
-            }
-        });
-        return keyRanges;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java
new file mode 100644
index 0000000..97ca828
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.STABLE_NAME;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getAllSplits;
+import static org.apache.phoenix.util.TestUtil.getSplits;
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Maps;
+
+@Category(HBaseManagedTimeTest.class)
+public class ParallelIteratorsIT extends BaseHBaseManagedTimeIT {
+
+    protected static final byte[] KMIN  = new byte[] {'!'};
+    protected static final byte[] KMIN2  = new byte[] {'.'};
+    protected static final byte[] K1  = new byte[] {'a'};
+    protected static final byte[] K3  = new byte[] {'c'};
+    protected static final byte[] K4  = new byte[] {'d'};
+    protected static final byte[] K5  = new byte[] {'e'};
+    protected static final byte[] K6  = new byte[] {'f'};
+    protected static final byte[] K9  = new byte[] {'i'};
+    protected static final byte[] K11 = new byte[] {'k'};
+    protected static final byte[] K12 = new byte[] {'l'};
+    protected static final byte[] KMAX  = new byte[] {'~'};
+    protected static final byte[] KMAX2  = new byte[] {'z'};
+    protected static final byte[] KR = new byte[] { 'r' };
+    protected static final byte[] KP = new byte[] { 'p' };
+    
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testGetSplits() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), TEST_PROPERTIES);
+        initTableValues(conn);
+        
+        PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE");
+        stmt.execute();
+        
+        // number of regions > target query concurrency
+        PhoenixPreparedStatement pstmt;
+        List<KeyRange> keyRanges;
+        
+        pstmt = conn.prepareStatement("SELECT COUNT(*) FROM STABLE").unwrap(PhoenixPreparedStatement.class);
+        pstmt.execute();
+        keyRanges = getAllSplits(conn);
+        assertEquals("Unexpected number of splits: " + keyRanges, 7, keyRanges.size());
+        assertEquals(newKeyRange(KeyRange.UNBOUND, KMIN), keyRanges.get(0));
+        assertEquals(newKeyRange(KMIN, K3), keyRanges.get(1));
+        assertEquals(newKeyRange(K3, K4), keyRanges.get(2));
+        assertEquals(newKeyRange(K4, K9), keyRanges.get(3));
+        assertEquals(newKeyRange(K9, K11), keyRanges.get(4));
+        assertEquals(newKeyRange(K11, KMAX), keyRanges.get(5));
+        assertEquals(newKeyRange(KMAX,  KeyRange.UNBOUND), keyRanges.get(6));
+        
+        keyRanges = getSplits(conn, K3, K6);
+        assertEquals("Unexpected number of splits: " + keyRanges, 2, keyRanges.size());
+        assertEquals(newKeyRange(K3, K4), keyRanges.get(0));
+        assertEquals(newKeyRange(K4, K6), keyRanges.get(1));
+        
+        keyRanges = getSplits(conn, K5, K6);
+        assertEquals("Unexpected number of splits: " + keyRanges, 1, keyRanges.size());
+        assertEquals(newKeyRange(K5, K6), keyRanges.get(0));
+        
+        keyRanges = getSplits(conn, null, K1);
+        assertEquals("Unexpected number of splits: " + keyRanges, 2, keyRanges.size());
+        assertEquals(newKeyRange(KeyRange.UNBOUND, KMIN), keyRanges.get(0));
+        assertEquals(newKeyRange(KMIN, K1), keyRanges.get(1));
+        conn.close();
+    }
+
+    @Test
+    public void testGuidePostsLifeCycle() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), TEST_PROPERTIES);
+        byte[][] splits = new byte[][] { K3, K9, KR };
+        ensureTableCreated(getUrl(), STABLE_NAME, splits);
+
+        PreparedStatement stmt = conn.prepareStatement("ANALYZE STABLE");
+        stmt.execute();
+        List<KeyRange> keyRanges = getAllSplits(conn);
+        assertEquals(4, keyRanges.size());
+        upsert(conn, new byte[][] { KMIN, K4, K11 });
+        stmt = conn.prepareStatement("ANALYZE STABLE");
+        stmt.execute();
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); 
+        keyRanges = getAllSplits(conn);
+        assertEquals(7, keyRanges.size());
+        upsert(conn, new byte[][] { KMIN2, K5, K12 });
+        stmt = conn.prepareStatement("ANALYZE STABLE");
+        stmt.execute();
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
+        keyRanges = getAllSplits(conn);
+        assertEquals(10, keyRanges.size());
+        upsert(conn, new byte[][] { K1, K6, KP });
+        stmt = conn.prepareStatement("ANALYZE STABLE");
+        stmt.execute();
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
+        keyRanges = getAllSplits(conn);
+        assertEquals(13, keyRanges.size());
+        conn.close();
+    }
+
+    private static void upsert(Connection conn, byte[][] val) throws Exception {
+        PreparedStatement stmt = conn.prepareStatement("upsert into " + STABLE_NAME + " VALUES (?, ?)");
+        stmt.setString(1, new String(val[0]));
+        stmt.setInt(2, 1);
+        stmt.execute();
+        stmt.setString(1, new String(val[1]));
+        stmt.setInt(2, 2);
+        stmt.execute();
+        stmt.setString(1, new String(val[2]));
+        stmt.setInt(2, 3);
+        stmt.execute();
+        conn.commit();
+    }
+
+    private static KeyRange newKeyRange(byte[] lowerRange, byte[] upperRange) {
+        return PDataType.CHAR.getKeyRange(lowerRange, true, upperRange, false);
+    }
+    
+    private static void initTableValues(Connection conn) throws Exception {
+        byte[][] splits = new byte[][] {K3,K4,K9,K11};
+        ensureTableCreated(getUrl(),STABLE_NAME,splits);
+        PreparedStatement stmt = conn.prepareStatement("upsert into " + STABLE_NAME + " VALUES (?, ?)");
+        stmt.setString(1, new String(KMIN));
+        stmt.setInt(2, 1);
+        stmt.execute();
+        stmt.setString(1, new String(KMAX));
+        stmt.setInt(2, 2);
+        stmt.execute();
+        conn.commit();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPlanIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPlanIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPlanIT.java
deleted file mode 100644
index 320ba72..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPlanIT.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
-import static org.apache.phoenix.util.TestUtil.PTSDB3_NAME;
-import static org.apache.phoenix.util.TestUtil.PTSDB_NAME;
-import static org.junit.Assert.assertEquals;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.google.common.collect.Maps;
-
-@Category(HBaseManagedTimeTest.class)
-public class QueryPlanIT extends BaseHBaseManagedTimeIT {
-    
-    @BeforeClass
-    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
-        // Override date format so we don't have a bunch of zeros
-        props.put(QueryServices.DATE_FORMAT_ATTRIB, "yyyy-MM-dd");
-        // Must update config before starting server
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-    
-    @Test
-    public void testExplainPlan() throws Exception {
-        ensureTableCreated(getUrl(), ATABLE_NAME, getDefaultSplits(getOrganizationId()));
-        ensureTableCreated(getUrl(), PTSDB_NAME, getDefaultSplits(getOrganizationId()));
-        ensureTableCreated(getUrl(), PTSDB3_NAME, getDefaultSplits(getOrganizationId()));
-        String[] queryPlans = new String[] {
-
-                "SELECT host FROM PTSDB3 WHERE host IN ('na1', 'na2','na3')",
-                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 KEYS OVER PTSDB3 [~'na3'] - [~'na1']\n" + 
-                "    SERVER FILTER BY FIRST KEY ONLY",
-
-                "SELECT host FROM PTSDB WHERE inst IS NULL AND host IS NOT NULL AND date >= to_date('2013-01-01')",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER PTSDB [null,not null]\n" + 
-                "    SERVER FILTER BY FIRST KEY ONLY AND DATE >= '2013-01-01 00:00:00.000'",
-
-                // Since inst IS NOT NULL is unbounded, we won't continue optimizing
-                "SELECT host FROM PTSDB WHERE inst IS NOT NULL AND host IS NULL AND date >= to_date('2013-01-01')",
-                "CLIENT PARALLEL 4-WAY RANGE SCAN OVER PTSDB [not null]\n" + 
-                "    SERVER FILTER BY FIRST KEY ONLY AND (HOST IS NULL AND DATE >= '2013-01-01 00:00:00.000')",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id = '000000000000002' AND x_integer = 2 AND a_integer < 5 ",
-                "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER ATABLE\n" + 
-                "    SERVER FILTER BY (X_INTEGER = 2 AND A_INTEGER < 5)",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000001','000000000000005') ",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000005'] - ['000000000000001','000000000000008']",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) <= ('000000000000001','000000000000005') ",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000003'] - ['000000000000001','000000000000006']",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id > '000000000000001' AND entity_id > '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000003','000000000000005') ",
-                "CLIENT PARALLEL 4-WAY RANGE SCAN OVER ATABLE ['000000000000003','000000000000005'] - [*]\n" + 
-                "    SERVER FILTER BY (ENTITY_ID > '000000000000002' AND ENTITY_ID < '000000000000008')",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id >= '000000000000002' AND entity_id < '000000000000008' AND (organization_id,entity_id) >= ('000000000000000','000000000000005') ",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','000000000000002'] - ['000000000000001','000000000000008']",
-
-                "SELECT * FROM atable",
-                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE",
-
-                "SELECT inst,host FROM PTSDB WHERE REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1', 'na2','na3')", // REVIEW: should this use skip scan given the regexpr_substr
-                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 3 RANGES OVER PTSDB ['na1'] - ['na4']\n" + 
-                "    SERVER FILTER BY FIRST KEY ONLY AND REGEXP_SUBSTR(INST, '[^-]+', 1) IN ('na1','na2','na3')",
-
-                "SELECT inst,host FROM PTSDB WHERE inst IN ('na1', 'na2','na3') AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')",
-                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 6 RANGES OVER PTSDB ['na1','a','2013-01-01'] - ['na3','b','2013-01-02']\n" + 
-                "    SERVER FILTER BY FIRST KEY ONLY",
-
-                "SELECT inst,host FROM PTSDB WHERE inst LIKE 'na%' AND host IN ('a','b') AND date >= to_date('2013-01-01') AND date < to_date('2013-01-02')",
-                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 RANGES OVER PTSDB ['na','a','2013-01-01'] - ['nb','b','2013-01-02']\n" + 
-                "    SERVER FILTER BY FIRST KEY ONLY",
-
-                "SELECT count(*) FROM atable",
-                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + 
-                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
-                "    SERVER AGGREGATE INTO SINGLE ROW",
-
-                "SELECT count(*) FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003            '] - ['000000000000001','004            ']\n" + 
-                "    SERVER FILTER BY FIRST KEY ONLY\n" + 
-                "    SERVER AGGREGATE INTO SINGLE ROW",
-
-                "SELECT a_string FROM atable WHERE organization_id='000000000000001' AND SUBSTR(entity_id,1,3) > '002' AND SUBSTR(entity_id,1,3) <= '003'",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001','003            '] - ['000000000000001','004            ']",
-
-                "SELECT count(1) FROM atable GROUP BY a_string",
-                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" +
-                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" +
-                "CLIENT MERGE SORT",
-
-                "SELECT count(1) FROM atable GROUP BY a_string LIMIT 5",
-                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + 
-                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING]\n" + 
-                "CLIENT MERGE SORT\n" + 
-                "CLIENT 5 ROW LIMIT",
-
-                "SELECT a_string FROM atable ORDER BY a_string DESC LIMIT 3",
-                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + 
-                "    SERVER TOP 3 ROWS SORTED BY [A_STRING DESC]\n" + 
-                "CLIENT MERGE SORT",
-
-                "SELECT count(1) FROM atable GROUP BY a_string,b_string HAVING max(a_string) = 'a'",
-                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" +
-                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
-                "CLIENT MERGE SORT\n" +
-                "CLIENT FILTER BY MAX(A_STRING) = 'a'",
-
-                "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY ROUND(a_time,'HOUR',2),entity_id HAVING max(a_string) = 'a'",
-                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" +
-                "    SERVER FILTER BY A_INTEGER = 1\n" +
-                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [ENTITY_ID, ROUND(A_TIME)]\n" +
-                "CLIENT MERGE SORT\n" +
-                "CLIENT FILTER BY MAX(A_STRING) = 'a'",
-
-                "SELECT count(1) FROM atable WHERE a_integer = 1 GROUP BY a_string,b_string HAVING max(a_string) = 'a' ORDER BY b_string",
-                "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" +
-                "    SERVER FILTER BY A_INTEGER = 1\n" +
-                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
-                "CLIENT MERGE SORT\n" +
-                "CLIENT FILTER BY MAX(A_STRING) = 'a'\n" +
-                "CLIENT SORTED BY [B_STRING]",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' AND entity_id != '000000000000002' AND x_integer = 2 AND a_integer < 5 LIMIT 10",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
-                "    SERVER FILTER BY (ENTITY_ID != '000000000000002' AND X_INTEGER = 2 AND A_INTEGER < 5)\n" + 
-                "    SERVER 10 ROW LIMIT\n" + 
-                "CLIENT 10 ROW LIMIT",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string ASC NULLS FIRST LIMIT 10",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
-                "    SERVER TOP 10 ROWS SORTED BY [A_STRING]\n" + 
-                "CLIENT MERGE SORT",
-
-                "SELECT max(a_integer) FROM atable WHERE organization_id = '000000000000001' GROUP BY organization_id,entity_id,ROUND(a_date,'HOUR') ORDER BY entity_id NULLS LAST LIMIT 10",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
-                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [ORGANIZATION_ID, ENTITY_ID, ROUND(A_DATE)]\n" + 
-                "CLIENT MERGE SORT\n" + 
-                "CLIENT TOP 10 ROWS SORTED BY [ENTITY_ID NULLS LAST]",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id = '000000000000001' ORDER BY a_string DESC NULLS LAST LIMIT 10",
-                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER ATABLE ['000000000000001']\n" + 
-                "    SERVER TOP 10 ROWS SORTED BY [A_STRING DESC NULLS LAST]\n" + 
-                "CLIENT MERGE SORT",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id IN ('000000000000001', '000000000000005')",
-                "CLIENT PARALLEL 1-WAY SKIP SCAN ON 2 KEYS OVER ATABLE ['000000000000001'] - ['000000000000005']",
-
-                "SELECT a_string,b_string FROM atable WHERE organization_id IN ('00D000000000001', '00D000000000005') AND entity_id IN('00E00000000000X','00E00000000000Z')",
-                "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 4 KEYS OVER ATABLE",
-        };
-        for (int i = 0; i < queryPlans.length; i+=2) {
-            String query = queryPlans[i];
-            String plan = queryPlans[i+1];
-            Properties props = new Properties();
-            Connection conn = DriverManager.getConnection(getUrl(), props);
-            try {
-                Statement statement = conn.createStatement();
-                ResultSet rs = statement.executeQuery("EXPLAIN " + query);
-                // TODO: figure out a way of verifying that query isn't run during explain execution
-                assertEquals(query, plan, QueryUtil.getExplainPlan(rs));
-            } catch (Exception e) {
-                throw new Exception(query + ": "+ e.getMessage(), e);
-            } finally {
-                conn.close();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
index 28bc011..3d057ae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
@@ -22,12 +22,12 @@ import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ParameterMetaData;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -36,14 +36,23 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.filter.SkipScanFilter;
-import org.apache.phoenix.iterate.SkipRangeParallelIteratorRegionSplitter;
+import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SpoolingResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PDataType;
@@ -58,6 +67,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.ScanUtil;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -71,6 +81,11 @@ import com.google.common.collect.Maps;
 
 /**
  * Tests for {@link SkipRangeParallelIteratorRegionSplitter}.
+ * TODO: Change this to be a connectionless test (ParallelIteratorsTest) with the ability to specify split points.
+ * -- On Connectionless, remember the split points of a table and use those when it says
+ * -- getRegionLocations
+ * -- Then drive this from a query plus getting the query plan and confirming the ranges
+ * -- from the plan.
  */
 @RunWith(Parameterized.class)
 @Category(ClientManagedTimeTest.class)
@@ -96,6 +111,7 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged
     }
 
     @Test
+    @Ignore
     public void testGetSplitsWithSkipScanFilter() throws Exception {
         byte[][] splits = new byte[][] {Ka1A, Ka1B, Ka1E, Ka1G, Ka1I, Ka2A};
         long ts = nextTimestamp();
@@ -106,9 +122,6 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         
         initTableValues();
-        PreparedStatement stmt = conn.prepareStatement("ANALYZE "+TABLE_NAME);
-        stmt.execute();
-        conn.close();
         TableRef tableRef = new TableRef(null,pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), TABLE_NAME)),ts, false);
         List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
         List<KeyRange> ranges = getSplits(tableRef, scan, regions, scanRanges);
@@ -322,7 +335,7 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
-    private static List<KeyRange> getSplits(TableRef tableRef, final Scan scan, final List<HRegionLocation> regions,
+    private static List<KeyRange> getSplits(final TableRef tableRef, final Scan scan, final List<HRegionLocation> regions,
             final ScanRanges scanRanges) throws SQLException {
         final List<TableRef> tableRefs = Collections.singletonList(tableRef);
         ColumnResolver resolver = new ColumnResolver() {
@@ -345,17 +358,83 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged
             
         };
         PhoenixConnection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
-        PhoenixStatement statement = new PhoenixStatement(connection);
-        StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
+        final PhoenixStatement statement = new PhoenixStatement(connection);
+        final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
         context.setScanRanges(scanRanges);
-        SkipRangeParallelIteratorRegionSplitter splitter = SkipRangeParallelIteratorRegionSplitter.getInstance(context, tableRef.getTable(), HintNode.EMPTY_HINT_NODE);
-        List<KeyRange> keyRanges = splitter.getSplits();
-        Collections.sort(keyRanges, new Comparator<KeyRange>() {
+        ParallelIterators parallelIterators = new ParallelIterators(new QueryPlan() {
+
+            @Override
+            public StatementContext getContext() {
+                return context;
+            }
+
+            @Override
+            public ParameterMetaData getParameterMetaData() {
+                return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return ExplainPlan.EMPTY_PLAN;
+            }
+
+            @Override
+            public ResultIterator iterator() throws SQLException {
+                return ResultIterator.EMPTY_ITERATOR;
+            }
+
             @Override
-            public int compare(KeyRange o1, KeyRange o2) {
-                return Bytes.compareTo(o1.getLowerRange(),o2.getLowerRange());
+            public long getEstimatedSize() {
+                return 0;
             }
-        });
+
+            @Override
+            public TableRef getTableRef() {
+                return tableRef;
+            }
+
+            @Override
+            public RowProjector getProjector() {
+                return RowProjector.EMPTY_PROJECTOR;
+            }
+
+            @Override
+            public Integer getLimit() {
+                return null;
+            }
+
+            @Override
+            public OrderBy getOrderBy() {
+                return OrderBy.EMPTY_ORDER_BY;
+            }
+
+            @Override
+            public GroupBy getGroupBy() {
+                return GroupBy.EMPTY_GROUP_BY;
+            }
+
+            @Override
+            public List<KeyRange> getSplits() {
+                return null;
+            }
+
+            @Override
+            public FilterableStatement getStatement() {
+                return SelectStatement.SELECT_ONE;
+            }
+
+            @Override
+            public boolean isDegenerate() {
+                return false;
+            }
+
+            @Override
+            public boolean isRowKeyOrdered() {
+                return true;
+            }
+            
+        }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+        List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
index 6f6c9a7..a07ad0e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
@@ -128,12 +128,13 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
             assertEquals(nRegions, nInitialRegions);
             
             int nRows = 2;
-            String query = "SELECT /*+ NO_INTRA_REGION_PARALLELIZATION */ count(*) FROM S WHERE a IN ('tl','jt')";
+            String query = "SELECT /*+ NO_INTRA_REGION_PARALLELIZATION */ count(*) FROM S WHERE a IN ('tl','jt',' a',' b',' c',' d')";
             ResultSet rs1 = conn.createStatement().executeQuery(query);
             assertTrue(rs1.next());
             nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
             // Region cache has been updated, as there are more regions now
             assertNotEquals(nRegions, nInitialRegions);
+            /*
             if (nRows != rs1.getInt(1)) {
                 // Run the same query again and it always passes now
                 // (as region cache is up-to-date)
@@ -141,6 +142,7 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
                 assertTrue(r2.next());
                 assertEquals(nRows, r2.getInt(1));
             }
+            */
             assertEquals(nRows, rs1.getInt(1));
         } finally {
             admin.close();
@@ -346,4 +348,30 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
         assertFalse(rs.next());
     }
 
+    @Test
+    public void testMinMaxRangeIntersection() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        
+        PreparedStatement stmt = conn.prepareStatement("create table splits_test "
+            + "(pk1 UNSIGNED_TINYINT NOT NULL, pk2 UNSIGNED_TINYINT NOT NULL, kv VARCHAR "
+            + "CONSTRAINT pk PRIMARY KEY (pk1, pk2)) SALT_BUCKETS=4 SPLIT ON (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+        // Split each salt bucket into multiple regions
+        stmt.setBytes(1, new byte[] {0, 1, 1});
+        stmt.setBytes(2, new byte[] {0, 2, 1});
+        stmt.setBytes(3, new byte[] {0, 3, 1});
+        stmt.setBytes(4, new byte[] {1, 1, 1});
+        stmt.setBytes(5, new byte[] {1, 2, 1});
+        stmt.setBytes(6, new byte[] {1, 3, 1});
+        stmt.setBytes(7, new byte[] {2, 1, 1});
+        stmt.setBytes(8, new byte[] {2, 2, 1});
+        stmt.setBytes(9, new byte[] {2, 3, 1});
+        stmt.setBytes(10, new byte[] {3, 1, 1});
+        stmt.setBytes(11, new byte[] {3, 2, 1});
+        stmt.setBytes(12, new byte[] {3, 3, 1});
+        stmt.execute();
+        
+        // Use a query with a RVC in a non equality expression
+        ResultSet rs = conn.createStatement().executeQuery("select count(kv) from splits_test where pk1 <= 3 and (pk1,PK2) >= (3, 1)");
+        assertTrue(rs.next());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsManagerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsManagerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsManagerIT.java
deleted file mode 100644
index b13379b..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsManagerIT.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.util.TestUtil.STABLE_NAME;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.util.Properties;
-
-import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.StatsManager;
-import org.apache.phoenix.query.StatsManagerImpl;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.TimeKeeper;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-
-/**
- * 
- * Test for stats manager, which is a client-side process that caches the
- * first and last key of a given table. The {@link #testStatsManager()}
- * test must be the only test here, as it relies on state that is only
- * cleared between test runs.
- *
- */
-
-@Category(ClientManagedTimeTest.class)
-public class StatsManagerIT extends BaseParallelIteratorsRegionSplitterIT {
-    
-    private static class ManualTimeKeeper implements TimeKeeper {
-        private long currentTime = 0;
-        @Override
-        public long getCurrentTime() {
-            return currentTime;
-        }
-        
-        public void setCurrentTime(long currentTime) {
-            this.currentTime = currentTime;
-        }
-    }
-
-    private static interface ChangeDetector {
-        boolean isChanged();
-    }
-
-    private boolean waitForAsyncChange(ChangeDetector detector, long maxWaitTimeMs) throws Exception {
-        long startTime = System.currentTimeMillis();
-        do {
-            if (detector.isChanged()) {
-                return true;
-            }
-            try {
-                Thread.sleep(500);
-            } catch (InterruptedException e) {
-                throw e;
-            }
-        } while (System.currentTimeMillis() - startTime < maxWaitTimeMs);
-        return false;
-    }
-
-    private static class MinKeyChange implements ChangeDetector {
-        private byte[] value;
-        private StatsManager stats;
-        private TableRef table;
-        
-        public MinKeyChange(StatsManager stats, TableRef table) {
-            this.value = stats.getMinKey(table);
-            this.stats = stats;
-            this.table = table;
-        }
-        @Override
-        public boolean isChanged() {
-            return value != stats.getMinKey(table);
-        }
-    }
-
-    private static class MaxKeyChange implements ChangeDetector {
-        private byte[] value;
-        private StatsManager stats;
-        private TableRef table;
-        
-        public MaxKeyChange(StatsManager stats, TableRef table) {
-            this.value = stats.getMaxKey(table);
-            this.stats = stats;
-            this.table = table;
-        }
-        @Override
-        public boolean isChanged() {
-            return value != stats.getMaxKey(table);
-        }
-    }
-
-    @Test
-    public void testStatsManager() throws Exception {
-        long ts = nextTimestamp();
-        initTableValues(ts);
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(url, props);
-        TableRef table = getTableRef(conn, ts);
-
-        int updateFreq = 5;
-        int maxAge = 10;
-        int startTime = 100;
-        long waitTime = 5000;
-        
-        ManualTimeKeeper timeKeeper = new ManualTimeKeeper();
-        timeKeeper.setCurrentTime(startTime);
-        ConnectionQueryServices services = driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
-        StatsManager stats = new StatsManagerImpl(services, updateFreq, maxAge, timeKeeper);
-        MinKeyChange minKeyChange = new MinKeyChange(stats, table);
-        MaxKeyChange maxKeyChange = new MaxKeyChange(stats, table);
-        
-        byte[] minKey = minKeyChange.value;
-        assertTrue(minKey == null);
-        assertTrue(waitForAsyncChange(minKeyChange,waitTime));
-        assertArrayEquals(KMIN, stats.getMinKey(table));
-        assertArrayEquals(KMAX, stats.getMaxKey(table));
-        minKeyChange = new MinKeyChange(stats, table);
-        
-        url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts+2;
-        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        conn = DriverManager.getConnection(url, props);
-        PreparedStatement delStmt = conn.prepareStatement("delete from " + STABLE_NAME + " where id=?");
-        delStmt.setString(1, new String(KMIN));
-        delStmt.execute();
-        PreparedStatement upsertStmt = conn.prepareStatement("upsert into " + STABLE_NAME + " VALUES (?, ?)");
-        upsertStmt.setString(1, new String(KMIN2));
-        upsertStmt.setInt(2, 1);
-        upsertStmt.execute();
-        conn.commit();
-
-        assertFalse(waitForAsyncChange(minKeyChange,waitTime)); // Stats won't change until they're attempted to be retrieved again
-        timeKeeper.setCurrentTime(timeKeeper.getCurrentTime() + updateFreq);
-        minKeyChange = new MinKeyChange(stats, table); // Will kick off change, but will upate asynchronously
-        assertArrayEquals(KMIN, minKeyChange.value);
-        assertTrue(waitForAsyncChange(minKeyChange,waitTime));
-        assertArrayEquals(KMIN2, stats.getMinKey(table));
-        assertArrayEquals(KMAX, stats.getMaxKey(table));
-        minKeyChange = new MinKeyChange(stats, table);
-        
-        timeKeeper.setCurrentTime(timeKeeper.getCurrentTime() + maxAge);
-        minKeyChange = new MinKeyChange(stats, table); // Will kick off change, but will upate asynchronously
-        assertTrue(null == minKeyChange.value);
-        assertTrue(waitForAsyncChange(minKeyChange,waitTime));
-        assertArrayEquals(KMIN2, stats.getMinKey(table));
-        assertArrayEquals(KMAX, stats.getMaxKey(table));
-        maxKeyChange = new MaxKeyChange(stats, table);
-        
-        delStmt.setString(1, new String(KMAX));
-        delStmt.execute();
-        upsertStmt.setString(1, new String(KMAX2));
-        upsertStmt.setInt(2, 1);
-        upsertStmt.execute();
-        conn.commit();
-        conn.close();
-
-        assertFalse(waitForAsyncChange(maxKeyChange,waitTime)); // Stats won't change until they're attempted to be retrieved again
-        timeKeeper.setCurrentTime(timeKeeper.getCurrentTime() + updateFreq);
-        maxKeyChange = new MaxKeyChange(stats, table); // Will kick off change, but will upate asynchronously
-        assertArrayEquals(KMAX, maxKeyChange.value);
-        assertTrue(waitForAsyncChange(maxKeyChange,waitTime));
-        assertArrayEquals(KMAX2, stats.getMaxKey(table));
-        assertArrayEquals(KMIN2, stats.getMinKey(table));
-        maxKeyChange = new MaxKeyChange(stats, table);
-        
-        timeKeeper.setCurrentTime(timeKeeper.getCurrentTime() + maxAge);
-        maxKeyChange = new MaxKeyChange(stats, table); // Will kick off change, but will upate asynchronously
-        assertTrue(null == maxKeyChange.value);
-        assertTrue(waitForAsyncChange(maxKeyChange,waitTime));
-        assertArrayEquals(KMIN2, stats.getMinKey(table));
-        assertArrayEquals(KMAX2, stats.getMaxKey(table));
-    }
-}


[5/6] git commit: PHOENIX-1251 Salted queries with range scan become full table scans

Posted by ja...@apache.org.
PHOENIX-1251 Salted queries with range scan become full table scans


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5f6f80b8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5f6f80b8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5f6f80b8

Branch: refs/heads/3.0
Commit: 5f6f80b83b07609e4990eded142ff9b6f09393a5
Parents: 88c6abb
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Oct 3 20:55:50 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Oct 3 20:55:50 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/phoenix/compile/ScanRanges.java   |  6 ++++++
 .../phoenix/coprocessor/MetaDataEndpointImpl.java     |  6 +++---
 .../org/apache/phoenix/iterate/ParallelIterators.java |  2 +-
 .../org/apache/phoenix/schema/MetaDataClient.java     | 14 +++++++-------
 pom.xml                                               |  2 +-
 5 files changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f6f80b8/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 1bd8cef..533d752 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -210,6 +210,9 @@ public class ScanRanges {
     public Scan intersectScan(Scan scan, final byte[] originalStartKey, final byte[] originalStopKey, final int keyOffset) {
         byte[] startKey = originalStartKey;
         byte[] stopKey = originalStopKey;
+        if (stopKey.length > 0 && Bytes.compareTo(startKey, stopKey) >= 0) {
+            return null;
+        }
         boolean mayHaveRows = false;
         // Keep the keys as they are if we have a point lookup, as we've already resolved the
         // salt bytes in that case.
@@ -338,6 +341,9 @@ public class ScanRanges {
                 scanStopKey = prefixKey(scanStopKey, scanKeyOffset, prefixBytes, keyOffset);
             }
         }
+        if (scanStopKey.length > 0 && Bytes.compareTo(scanStartKey, scanStopKey) >= 0) {
+            return null;
+        }
         newScan.setStartRow(scanStartKey);
         newScan.setStopRow(scanStopKey);
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f6f80b8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index f1f05be..17e5e15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -482,10 +482,10 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
                 KeyValue current = result.raw()[0];
                 int tableNameLength = tableNameBytes.length + 1;
                 int cfOffset = current.getRowOffset() + tableNameLength;
-                int cfLength = getVarCharLength(current.getRow(), cfOffset, current.getRowLength() - tableNameLength);
-                ptr.set(current.getRow(), cfOffset, cfLength);
+                int cfLength = getVarCharLength(current.getBuffer(), cfOffset, current.getRowLength() - tableNameLength);
+                ptr.set(current.getBuffer(), cfOffset, cfLength);
                 byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getValue(), current.getValueOffset(), current
+                PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getBuffer(), current.getValueOffset(), current
                         .getValueLength());
                 if (array != null && array.getDimensions() != 0) {
                     List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions());                        

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f6f80b8/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index da8c212..81dfbb6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -332,7 +332,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         for (int i = 0; i < gps.size(); i++) {
             buf.append(Bytes.toStringBinary(gps.get(i)));
             buf.append(",");
-            if (i < gps.size()-1 && (i % 10) == 0) {
+            if (i+1 < gps.size() && ((i+1) % 10) == 0) {
                 buf.append("\n");
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f6f80b8/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 76f1b2a..79eaea0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1448,14 +1448,14 @@ public class MetaDataClient {
                                 tableRefs.add(new TableRef(null, viewIndexTable, ts, false));
                             }
                         }
+                        // Delete everything in the column. You'll still be able to do queries at earlier timestamps
+                        tableRefs.add(new TableRef(null, table, ts, false));
+                        // TODO: Let the standard mutable secondary index maintenance handle this?
+                        for (PTable index: table.getIndexes()) {
+                            tableRefs.add(new TableRef(null, index, ts, false));
+                        }
+                        deleteFromStatsTable(tableRefs, ts);
                         if (!dropMetaData) {
-                            // Delete everything in the column. You'll still be able to do queries at earlier timestamps
-                            tableRefs.add(new TableRef(null, table, ts, false));
-                            // TODO: Let the standard mutable secondary index maintenance handle this?
-                            for (PTable index: table.getIndexes()) {
-                                tableRefs.add(new TableRef(null, index, ts, false));
-                            }
-                            deleteFromStatsTable(tableRefs, ts);
                             MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null, Collections.<PColumn>emptyList(), ts);
                             return connection.getQueryServices().updateData(plan);
                         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f6f80b8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9ff65d0..bd9e9c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -247,7 +247,7 @@
                   <encoding>UTF-8</encoding>
                   <forkCount>${numForkedIT}</forkCount>
                   <reuseForks>true</reuseForks>
-                  <argLine>-enableassertions -Xmx2000m -XX:MaxPermSize=128m -Djava.security.egd=file:/dev/./urandom</argLine>
+                  <argLine>-enableassertions -Xmx2500m -XX:MaxPermSize=128m -Djava.security.egd=file:/dev/./urandom</argLine>
                   <redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile>
                   <testSourceDirectory>${basedir}/src/it/java</testSourceDirectory>
                   <groups>org.apache.phoenix.end2end.HBaseManagedTimeTest</groups>