You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/04 05:51:00 UTC

[3/6] PHOENIX-1251 Salted queries with range scan become full table scans

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
index 8f9e553..4b20979 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
@@ -18,34 +18,22 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.analyzeTable;
+import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
@@ -153,9 +141,8 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             assertTrue("Expected 1 row in result set", rs.next());
             assertEquals(2, rs.getInt(3));
             assertEquals("Viva Las Vegas", rs.getString(4));
-            conn1 = nextConnection(getUrl());
-            List<KeyRange> splits = getSplits(conn1, new Scan());
-            assertEquals(splits.size(), 5);
+            List<KeyRange> splits = getAllSplits(conn1, TENANT_TABLE_NAME);
+            assertEquals(3, splits.size());
         }
         finally {
             conn1.close();
@@ -490,10 +477,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
         }
     }
     
-    private void analyzeTable(Connection conn, String tableName) throws IOException, SQLException {
-        String query = "ANALYZE " + tableName;
-        conn.createStatement().execute(query);
-    }
     @Test
     public void testUpsertValuesUsingViewWithNoWhereClause() throws Exception {
         Connection conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -508,34 +491,4 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
         assertFalse(rs.next());
         conn.close();
     }
-    private static List<KeyRange> getSplits(Connection conn, final Scan scan) throws SQLException {
-        TableRef tableRef = getTableRef(conn);
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(
-                tableRef.getTable().getPhysicalName().getBytes());
-        PhoenixStatement statement = new PhoenixStatement(pconn);
-        StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement));
-        DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef.getTable(),
-                HintNode.EMPTY_HINT_NODE) {
-            @Override
-            protected List<HRegionLocation> getAllRegions() throws SQLException {
-                return DefaultParallelIteratorRegionSplitter.filterRegions(regions, scan.getStartRow(),
-                        scan.getStopRow());
-            }
-        };
-        List<KeyRange> keyRanges = splitter.getSplits();
-        Collections.sort(keyRanges, new Comparator<KeyRange>() {
-            @Override
-            public int compare(KeyRange o1, KeyRange o2) {
-                return Bytes.compareTo(o1.getLowerRange(), o2.getLowerRange());
-            }
-        });
-        return keyRanges;
-    }
-    protected static TableRef getTableRef(Connection conn) throws SQLException {
-        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        TableRef table = new TableRef(null, pconn.getMetaDataCache().getTable(
-                new PTableKey(pconn.getTenantId(), PARENT_TABLE_NAME)), System.currentTimeMillis(), false);
-        return table;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
index d5e9d42..8f7912a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
@@ -145,7 +145,7 @@ public class SaltedIndexIT extends BaseIndexIT {
         rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         expectedPlan = indexSaltBuckets == null ? 
              "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [~'y']" : 
-            ("CLIENT PARALLEL 4-WAY SKIP SCAN ON 4 KEYS OVER " + INDEX_TABLE_FULL_NAME + " [0,~'y'] - [3,~'y']\n" + 
+            ("CLIENT PARALLEL 4-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [0,~'y']\n" + 
              "CLIENT MERGE SORT");
         assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs));
 
@@ -164,7 +164,7 @@ public class SaltedIndexIT extends BaseIndexIT {
         rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         expectedPlan = indexSaltBuckets == null ? 
             "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [*] - [~'x']" :
-            ("CLIENT PARALLEL 4-WAY SKIP SCAN ON 4 RANGES OVER " + INDEX_TABLE_FULL_NAME + " [0,*] - [3,~'x']\n" + 
+            ("CLIENT PARALLEL 4-WAY RANGE SCAN OVER " + INDEX_TABLE_FULL_NAME + " [0,*] - [0,~'x']\n" + 
              "CLIENT MERGE SORT");
         assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs));
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index a6ee92e..267aec9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -40,8 +40,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.collect.ImmutableSet;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
@@ -51,11 +49,14 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 
+import com.google.common.collect.ImmutableSet;
+
 /**
  * 
  * Client for sending cache to each region server
@@ -145,15 +146,18 @@ public class ServerCacheClient {
         ExecutorService executor = services.getExecutor();
         List<Future<Boolean>> futures = Collections.emptyList();
         try {
-            List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
+            PTable cacheUsingTable = cacheUsingTableRef.getTable();
+            List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes());
             int nRegions = locations.size();
             // Size these based on worst case
             futures = new ArrayList<Future<Boolean>>(nRegions);
             Set<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions);
             for (HRegionLocation entry : locations) {
                 // Keep track of servers we've sent to and only send once
+                byte[] regionStartKey = entry.getRegionInfo().getStartKey();
+                byte[] regionEndKey = entry.getRegionInfo().getEndKey();
                 if ( ! servers.contains(entry) && 
-                        keyRanges.intersect(entry.getRegionInfo().getStartKey(), entry.getRegionInfo().getEndKey())) {  // Call RPC once per server
+                        keyRanges.intersects(regionStartKey, regionEndKey, 0) ) {
                     servers.add(entry);
                     if (LOG.isDebugEnabled()) {LOG.debug("Adding cache entry to be sent for " + entry);}
                     final byte[] key = entry.getRegionInfo().getStartKey();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index b5c14f0..271ba30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -64,4 +64,6 @@ public interface QueryPlan extends StatementPlan {
     FilterableStatement getStatement();
 
     public boolean isDegenerate();
+    
+    public boolean isRowKeyOrdered();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index dc8e0b3..1bd8cef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -23,12 +23,16 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -40,28 +44,40 @@ import com.google.common.collect.Lists;
 public class ScanRanges {
     private static final List<List<KeyRange>> EVERYTHING_RANGES = Collections.<List<KeyRange>>emptyList();
     private static final List<List<KeyRange>> NOTHING_RANGES = Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE));
-    public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, false, false);
-    public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, false, false);
+    public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, KeyRange.EVERYTHING_RANGE, KeyRange.EVERYTHING_RANGE, false, false, null);
+    public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, KeyRange.EMPTY_RANGE, KeyRange.EMPTY_RANGE, false, false, null);
+    private static final Scan HAS_INTERSECTION = new Scan();
 
     public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan) {
-        return create(schema, ranges, slotSpan, false, null);
+        return create(schema, ranges, slotSpan, KeyRange.EVERYTHING_RANGE, false, null);
     }
     
-    public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, boolean forceRangeScan, Integer nBuckets) {
-        int offset = nBuckets == null ? 0 : 1;
-        if (ranges.size() == offset) {
+    public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, KeyRange minMaxRange, boolean forceRangeScan, Integer nBuckets) {
+        int offset = nBuckets == null ? 0 : SaltingUtil.NUM_SALTING_BYTES;
+        if (ranges.size() == offset && minMaxRange == KeyRange.EVERYTHING_RANGE) {
             return EVERYTHING;
-        } else if (ranges.size() == 1 + offset && ranges.get(offset).size() == 1 && ranges.get(offset).get(0) == KeyRange.EMPTY_RANGE) {
+        } else if (minMaxRange == KeyRange.EMPTY_RANGE || (ranges.size() == 1 + offset && ranges.get(offset).size() == 1 && ranges.get(offset).get(0) == KeyRange.EMPTY_RANGE)) {
             return NOTHING;
         }
         boolean isPointLookup = !forceRangeScan && ScanRanges.isPointLookup(schema, ranges, slotSpan);
         if (isPointLookup) {
-            // TODO: consider keeping original to use for serialization as it would
-            // be smaller?
+            // TODO: consider keeping original to use for serialization as it would be smaller?
             List<byte[]> keys = ScanRanges.getPointKeys(ranges, slotSpan, schema, nBuckets);
             List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size());
+            KeyRange unsaltedMinMaxRange = minMaxRange;
+            if (nBuckets != null && minMaxRange != KeyRange.EVERYTHING_RANGE) {
+                unsaltedMinMaxRange = KeyRange.getKeyRange(
+                        stripPrefix(minMaxRange.getLowerRange(),offset),
+                        minMaxRange.lowerUnbound(), 
+                        stripPrefix(minMaxRange.getUpperRange(),offset), 
+                        minMaxRange.upperUnbound());
+            }
             for (byte[] key : keys) {
-                keyRanges.add(KeyRange.getKeyRange(key));
+                // Filter now based on unsalted minMaxRange and ignore the point key salt byte
+                if ( unsaltedMinMaxRange.compareLowerToUpperBound(key, offset, key.length-offset, true) <= 0 &&
+                     unsaltedMinMaxRange.compareUpperToLowerBound(key, offset, key.length-offset, true) >= 0) {
+                    keyRanges.add(KeyRange.getKeyRange(key));
+                }
             }
             ranges = Collections.singletonList(keyRanges);
             if (keys.size() > 1) {
@@ -72,39 +88,286 @@ public class ScanRanges {
                 // when there's a single key.
                 slotSpan = new int[] {schema.getMaxFields()-1};
             }
-        } else if (nBuckets != null) {
-            List<List<KeyRange>> saltedRanges = Lists.newArrayListWithExpectedSize(ranges.size());
-            saltedRanges.add(SaltingUtil.generateAllSaltingRanges(nBuckets));
-            saltedRanges.addAll(ranges.subList(1, ranges.size()));
-            ranges = saltedRanges;
         }
-        return new ScanRanges(schema, slotSpan, ranges, forceRangeScan, isPointLookup);
+        List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size());
+        for (int i = 0; i < ranges.size(); i++) {
+            List<KeyRange> sorted = Lists.newArrayList(ranges.get(i));
+            Collections.sort(sorted, KeyRange.COMPARATOR);
+            sortedRanges.add(ImmutableList.copyOf(sorted));
+        }
+        boolean useSkipScanFilter = useSkipScanFilter(forceRangeScan, isPointLookup, sortedRanges);
+        
+        // Don't set minMaxRange for point lookup because it causes issues during intersect
+        // by going across region boundaries
+        KeyRange scanRange = KeyRange.EVERYTHING_RANGE;
+        // if (!isPointLookup && (nBuckets == null || !useSkipScanFilter)) {
+        // if (! ( isPointLookup || (nBuckets != null && useSkipScanFilter) ) ) {
+        // if (nBuckets == null || (nBuckets != null && (!isPointLookup || !useSkipScanFilter))) {
+        if (nBuckets == null || !isPointLookup || !useSkipScanFilter) {
+            byte[] minKey = ScanUtil.getMinKey(schema, sortedRanges, slotSpan);
+            byte[] maxKey = ScanUtil.getMaxKey(schema, sortedRanges, slotSpan);
+            // If the maxKey has crossed the salt byte boundary, then we do not
+            // have anything to filter at the upper end of the range
+            if (ScanUtil.crossesPrefixBoundary(maxKey, ScanUtil.getPrefix(minKey, offset), offset)) {
+                maxKey = KeyRange.UNBOUND;
+            }
+            // We won't filter anything at the low end of the range if we just have the salt byte
+            if (minKey.length <= offset) {
+                minKey = KeyRange.UNBOUND;
+            }
+            scanRange = KeyRange.getKeyRange(minKey, maxKey);
+        }
+        if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
+            minMaxRange = ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new ImmutableBytesWritable());
+            scanRange = scanRange.intersect(minMaxRange);
+        }
+        
+        if (scanRange == KeyRange.EMPTY_RANGE) {
+            return NOTHING;
+        }
+        return new ScanRanges(schema, slotSpan, sortedRanges, scanRange, minMaxRange, useSkipScanFilter, isPointLookup, nBuckets);
     }
 
     private SkipScanFilter filter;
     private final List<List<KeyRange>> ranges;
     private final int[] slotSpan;
     private final RowKeySchema schema;
-    private final boolean forceRangeScan;
     private final boolean isPointLookup;
+    private final boolean isSalted;
+    private final boolean useSkipScanFilter;
+    private final KeyRange scanRange;
+    private final KeyRange minMaxRange;
 
-    private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, boolean forceRangeScan, boolean isPointLookup) {
+    private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, KeyRange scanRange, KeyRange minMaxRange, boolean useSkipScanFilter, boolean isPointLookup, Integer bucketNum) {
         this.isPointLookup = isPointLookup;
-        List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size());
-        for (int i = 0; i < ranges.size(); i++) {
-            List<KeyRange> sorted = Lists.newArrayList(ranges.get(i));
-            Collections.sort(sorted, KeyRange.COMPARATOR);
-            sortedRanges.add(ImmutableList.copyOf(sorted));
+        this.isSalted = bucketNum != null;
+        this.useSkipScanFilter = useSkipScanFilter;
+        this.scanRange = scanRange;
+        this.minMaxRange = minMaxRange;
+        
+        // Only blow out the bucket values if we're using the skip scan. We need all the
+        // bucket values in this case because we use intersect against a key that may have
+        // any of the possible bucket values. Otherwise, we can pretty easily ignore the
+        // bucket values.
+        if (useSkipScanFilter && isSalted && !isPointLookup) {
+        	ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum));
         }
-        this.ranges = ImmutableList.copyOf(sortedRanges);
+        this.ranges = ImmutableList.copyOf(ranges);
         this.slotSpan = slotSpan;
         this.schema = schema;
-        if (schema != null && !ranges.isEmpty()) {
+        if (schema != null && !ranges.isEmpty()) { // TODO: only create if useSkipScanFilter is true?
             this.filter = new SkipScanFilter(this.ranges, slotSpan, schema);
         }
-        this.forceRangeScan = forceRangeScan;
+    }
+    
+    /**
+     * Get the minMaxRange that is applied in addition to the scan range.
+     * Only used by the ExplainTable to generate the explain plan.
+     */
+    public KeyRange getMinMaxRange() {
+        return minMaxRange;
+    }
+    
+    public void initializeScan(Scan scan) {
+        scan.setStartRow(scanRange.getLowerRange());
+        scan.setStopRow(scanRange.getUpperRange());
+    }
+    
+    private static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) {
+        if (key.length > 0) {
+            byte[] newKey = new byte[key.length + prefixKeyOffset];
+            int totalKeyOffset = keyOffset + prefixKeyOffset;
+            if (prefixKey.length >= totalKeyOffset) { // otherwise it's null padded
+                System.arraycopy(prefixKey, 0, newKey, 0, totalKeyOffset);
+            }
+            System.arraycopy(key, keyOffset, newKey, totalKeyOffset, key.length - keyOffset);
+            return newKey;
+        }
+        return key;
+    }
+    
+    private static byte[] replaceSaltByte(byte[] key, byte[] saltKey) {
+        if (key.length == 0) {
+            return key;
+        }
+        byte[] temp = new byte[key.length];
+        if (saltKey.length >= SaltingUtil.NUM_SALTING_BYTES) { // Otherwise it's null padded
+            System.arraycopy(saltKey, 0, temp, 0, SaltingUtil.NUM_SALTING_BYTES);
+        }
+        System.arraycopy(key, SaltingUtil.NUM_SALTING_BYTES, temp, SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES);
+        return temp;
+    }
+    
+    private static byte[] stripPrefix(byte[] key, int keyOffset) {
+        if (key.length == 0) {
+            return key;
+        }
+        byte[] temp = new byte[key.length - keyOffset];
+        System.arraycopy(key, keyOffset, temp, 0, key.length - keyOffset);
+        return temp;
+    }
+    
+    public Scan intersectScan(Scan scan, final byte[] originalStartKey, final byte[] originalStopKey, final int keyOffset) {
+        byte[] startKey = originalStartKey;
+        byte[] stopKey = originalStopKey;
+        boolean mayHaveRows = false;
+        // Keep the keys as they are if we have a point lookup, as we've already resolved the
+        // salt bytes in that case.
+        final int scanKeyOffset = this.isSalted && !this.isPointLookup ? SaltingUtil.NUM_SALTING_BYTES : 0;
+        assert (scanKeyOffset == 0 || keyOffset == 0);
+        // Offset for startKey/stopKey. Either 1 for salted tables or the prefix length
+        // of the current region for local indexes.
+        final int totalKeyOffset = scanKeyOffset + keyOffset;
+        // In this case, we've crossed the "prefix" boundary and should consider everything after the startKey
+        // This prevents us from having to prefix the key prior to knowing whether or not there may be an
+        // intersection.
+        byte[] prefixBytes = ByteUtil.EMPTY_BYTE_ARRAY;
+        if (totalKeyOffset > 0) {
+            prefixBytes = ScanUtil.getPrefix(startKey, totalKeyOffset);
+            if (ScanUtil.crossesPrefixBoundary(stopKey, prefixBytes, totalKeyOffset)) {
+                stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+            }
+        }
+        int scanStartKeyOffset = scanKeyOffset;
+        byte[] scanStartKey = scan == null ? ByteUtil.EMPTY_BYTE_ARRAY : scan.getStartRow();
+        // Compare ignoring key prefix and salt byte
+        if (scanStartKey.length > 0) {
+            if (startKey.length > 0 && Bytes.compareTo(scanStartKey, scanKeyOffset, scanStartKey.length - scanKeyOffset, startKey, totalKeyOffset, startKey.length - totalKeyOffset) < 0) {
+                scanStartKey = startKey;
+                scanStartKeyOffset = totalKeyOffset;
+            }
+        } else {
+        	scanStartKey = startKey;
+            scanStartKeyOffset = totalKeyOffset;
+            mayHaveRows = true;
+        }
+        int scanStopKeyOffset = scanKeyOffset;
+        byte[] scanStopKey = scan == null ? ByteUtil.EMPTY_BYTE_ARRAY : scan.getStopRow();
+        if (scanStopKey.length > 0) {
+            if (stopKey.length > 0 && Bytes.compareTo(scanStopKey, scanKeyOffset, scanStopKey.length - scanKeyOffset, stopKey, totalKeyOffset, stopKey.length - totalKeyOffset) > 0) {
+                scanStopKey = stopKey;
+                scanStopKeyOffset = totalKeyOffset;
+            }
+        } else {
+        	scanStopKey = stopKey;
+            scanStopKeyOffset = totalKeyOffset;
+            mayHaveRows = true;
+        }
+        mayHaveRows = mayHaveRows || Bytes.compareTo(scanStartKey, scanStartKeyOffset, scanStartKey.length - scanStartKeyOffset, scanStopKey, scanStopKeyOffset, scanStopKey.length - scanStopKeyOffset) < 0;
+        
+        if (!mayHaveRows) {
+            return null;
+        }
+        if (originalStopKey.length != 0 && scanStopKey.length == 0) {
+            scanStopKey = originalStopKey;
+        }
+        Filter newFilter = null;
+        // If the scan is using skip scan filter, intersect and replace the filter.
+        if (scan == null || this.useSkipScanFilter()) {
+            byte[] skipScanStartKey = scanStartKey;
+            byte[] skipScanStopKey = scanStopKey;
+            // If we have a keyOffset and we've used the startKey/stopKey that
+            // were passed in (which have the prefix) for the above range check,
+            // we need to remove the prefix before running our intersect method.
+            // TODO: we could use skipScanFilter.setOffset(keyOffset) if both
+            // the startKey and stopKey were used above *and* our intersect
+            // method honored the skipScanFilter.offset variable.
+            if (scanKeyOffset > 0) {
+                if (skipScanStartKey != originalStartKey) { // original already has correct salt byte
+                    skipScanStartKey = replaceSaltByte(skipScanStartKey, prefixBytes);
+                }
+                if (skipScanStopKey != originalStopKey) {
+                    skipScanStopKey = replaceSaltByte(skipScanStopKey, prefixBytes);
+                }
+            } else if (keyOffset > 0) {
+                if (skipScanStartKey == originalStartKey) {
+                    skipScanStartKey = stripPrefix(skipScanStartKey, keyOffset);
+                }
+                if (skipScanStopKey == originalStopKey) {
+                    skipScanStopKey = stripPrefix(skipScanStopKey, keyOffset);
+                }
+            }
+            if (scan == null) {
+                return filter.hasIntersect(skipScanStartKey, skipScanStopKey) ? HAS_INTERSECTION : null;
+            }
+            Filter filter = scan.getFilter();
+            SkipScanFilter newSkipScanFilter = null;
+            if (filter instanceof SkipScanFilter) {
+                SkipScanFilter oldSkipScanFilter = (SkipScanFilter)filter;
+                newFilter = newSkipScanFilter = oldSkipScanFilter.intersect(skipScanStartKey, skipScanStopKey);
+                if (newFilter == null) {
+                    return null;
+                }
+            } else if (filter instanceof FilterList) {
+                FilterList oldList = (FilterList)filter;
+                FilterList newList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+                newFilter = newList;
+                for (Filter f : oldList.getFilters()) {
+                    if (f instanceof SkipScanFilter) {
+                        newSkipScanFilter = ((SkipScanFilter)f).intersect(skipScanStartKey, skipScanStopKey);
+                        if (newSkipScanFilter == null) {
+                            return null;
+                        }
+                        newList.addFilter(newSkipScanFilter);
+                    } else {
+                        newList.addFilter(f);
+                    }
+                }
+            }
+            // TODO: it seems that our SkipScanFilter or HBase runs into problems if we don't
+            // have an enclosing range when we do a point lookup.
+            if (isPointLookup) {
+                scanStartKey = ScanUtil.getMinKey(schema, newSkipScanFilter.getSlots(), slotSpan);
+                scanStopKey = ScanUtil.getMaxKey(schema, newSkipScanFilter.getSlots(), slotSpan);
+            }
+        }
+        if (newFilter == null) {
+            newFilter = scan.getFilter();
+        }
+        Scan newScan = ScanUtil.newScan(scan);
+        newScan.setFilter(newFilter);
+        // If we have an offset (salted table or local index), we need to make sure to
+        // prefix our scan start/stop row by the prefix of the startKey or stopKey that
+        // were passed in. Our scan either doesn't have the prefix or has a placeholder
+        // for it.
+        if (totalKeyOffset > 0) {
+            if (scanStartKey != originalStartKey) {
+                scanStartKey = prefixKey(scanStartKey, scanKeyOffset, prefixBytes, keyOffset);
+            }
+            if (scanStopKey != originalStopKey) {
+                scanStopKey = prefixKey(scanStopKey, scanKeyOffset, prefixBytes, keyOffset);
+            }
+        }
+        newScan.setStartRow(scanStartKey);
+        newScan.setStopRow(scanStopKey);
+        
+        return newScan;
     }
 
+    /**
+     * Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey
+     * intersects with any of the scan ranges and false otherwise. We cannot pass in
+     * a KeyRange here, because the underlying compare functions expect lower inclusive
+     * and upper exclusive keys. We cannot get their next key because the key must
+     * conform to the row key schema and if a null byte is added to a lower inclusive
+     * key, it's no longer a valid, real key.
+     * @param lowerInclusiveKey lower inclusive key
+     * @param upperExclusiveKey upper exclusive key
+     * @return true if the scan range intersects with the specified lower/upper key
+     * range
+     */
+    public boolean intersects(byte[] lowerInclusiveKey, byte[] upperExclusiveKey, int keyOffset) {
+        if (isEverything()) {
+            return true;
+        }
+        if (isDegenerate()) {
+            return false;
+        }
+        
+        //return filter.hasIntersect(lowerInclusiveKey, upperExclusiveKey);
+        return intersectScan(null, lowerInclusiveKey, upperExclusiveKey, keyOffset) == HAS_INTERSECTION;
+    }
+    
     public SkipScanFilter getSkipScanFilter() {
         return filter;
     }
@@ -132,11 +395,15 @@ public class ScanRanges {
      *    not the last key slot
      */
     public boolean useSkipScanFilter() {
+        return useSkipScanFilter;
+    }
+    
+    private static boolean useSkipScanFilter(boolean forceRangeScan, boolean isPointLookup, List<List<KeyRange>> ranges) {
         if (forceRangeScan) {
             return false;
         }
         if (isPointLookup) {
-            return getPointLookupCount() > 1;
+            return getPointLookupCount(isPointLookup, ranges) > 1;
         }
         boolean hasRangeKey = false, useSkipScan = false;
         for (List<KeyRange> orRanges : ranges) {
@@ -208,6 +475,10 @@ public class ScanRanges {
     }
     
     public int getPointLookupCount() {
+        return getPointLookupCount(isPointLookup, ranges);
+    }
+    
+    private static int getPointLookupCount(boolean isPointLookup, List<List<KeyRange>> ranges) {
         return isPointLookup ? ranges.get(0).size() : 0;
     }
     
@@ -215,51 +486,6 @@ public class ScanRanges {
         return isPointLookup ? ranges.get(0).iterator() : Iterators.<KeyRange>emptyIterator();
     }
 
-    public void setScanStartStopRow(Scan scan) {
-        if (isEverything()) {
-            return;
-        }
-        if (isDegenerate()) {
-            scan.setStartRow(KeyRange.EMPTY_RANGE.getLowerRange());
-            scan.setStopRow(KeyRange.EMPTY_RANGE.getUpperRange());
-            return;
-        }
-        
-        byte[] expectedKey;
-        expectedKey = ScanUtil.getMinKey(schema, ranges, slotSpan);
-        if (expectedKey != null) {
-            scan.setStartRow(expectedKey);
-        }
-        expectedKey = ScanUtil.getMaxKey(schema, ranges, slotSpan);
-        if (expectedKey != null) {
-            scan.setStopRow(expectedKey);
-        }
-    }
-
-    public static final ImmutableBytesWritable UNBOUND = new ImmutableBytesWritable(KeyRange.UNBOUND);
-
-    /**
-     * Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey
-     * intersects with any of the scan ranges and false otherwise. We cannot pass in
-     * a KeyRange here, because the underlying compare functions expect lower inclusive
-     * and upper exclusive keys. We cannot get their next key because the key must
-     * conform to the row key schema and if a null byte is added to a lower inclusive
-     * key, it's no longer a valid, real key.
-     * @param lowerInclusiveKey lower inclusive key
-     * @param upperExclusiveKey upper exclusive key
-     * @return true if the scan range intersects with the specified lower/upper key
-     * range
-     */
-    public boolean intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) {
-        if (isEverything()) {
-            return true;
-        }
-        if (isDegenerate()) {
-            return false;
-        }
-        return filter.hasIntersect(lowerInclusiveKey, upperExclusiveKey);
-    }
-    
     public int getPkColumnSpan() {
         return this == ScanRanges.NOTHING ? 0 : ScanUtil.getTotalSpan(ranges, slotSpan);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 3cb6ce9..887ca3e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -31,7 +31,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -39,7 +38,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.NumberUtil;
-import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.Maps;
 
@@ -67,7 +65,6 @@ public class StatementContext {
     
     private long currentTime = QueryConstants.UNSET_TIMESTAMP;
     private ScanRanges scanRanges = ScanRanges.EVERYTHING;
-    private KeyRange minMaxRange = null;
     private final SequenceManager sequences; 
 
     private TableRef currentTable;
@@ -158,41 +155,8 @@ public class StatementContext {
     }
     
     public void setScanRanges(ScanRanges scanRanges) {
-        setScanRanges(scanRanges, null);
-    }
-
-    public void setScanRanges(ScanRanges scanRanges, KeyRange minMaxRange) {
         this.scanRanges = scanRanges;
-        this.scanRanges.setScanStartStopRow(scan);
-        PTable table = this.getCurrentTable().getTable();
-        if (minMaxRange != null) {
-            // Ensure minMaxRange is lower inclusive and upper exclusive, as that's
-            // what we need to intersect against for the HBase scan.
-            byte[] lowerRange = minMaxRange.getLowerRange();
-            if (!minMaxRange.lowerUnbound()) {
-                if (!minMaxRange.isLowerInclusive()) {
-                    lowerRange = ScanUtil.nextKey(lowerRange, table, tempPtr);
-                }
-            }
-            
-            byte[] upperRange = minMaxRange.getUpperRange();
-            if (!minMaxRange.upperUnbound()) {
-                if (minMaxRange.isUpperInclusive()) {
-                    upperRange = ScanUtil.nextKey(upperRange, table, tempPtr);
-                }
-            }
-            if (minMaxRange.getLowerRange() != lowerRange || minMaxRange.getUpperRange() != upperRange) {
-                minMaxRange = KeyRange.getKeyRange(lowerRange, true, upperRange, false);
-            }
-            // If we're not salting, we can intersect this now with the scan range.
-            // Otherwise, we have to wait to do this when we chunk up the scan.
-            if (table.getBucketNum() == null) {
-                minMaxRange = minMaxRange.intersect(KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow()));
-                scan.setStartRow(minMaxRange.getLowerRange());
-                scan.setStopRow(minMaxRange.getUpperRange());
-            }
-            this.minMaxRange = minMaxRange;
-        }
+        scanRanges.initializeScan(scan);
     }
     
     public PhoenixConnection getConnection() {
@@ -224,14 +188,6 @@ public class StatementContext {
         return currentTime;
     }
 
-    /**
-     * Get the key range derived from row value constructor usage in where clause. These are orthogonal to the ScanRanges
-     * and form a range for which each scan is intersected against.
-     */
-    public KeyRange getMinMaxRange () {
-        return minMaxRange;
-    }
-    
     public SequenceManager getSequenceManager(){
         return sequences;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 9cd7e01..634bd15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -58,6 +58,7 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
@@ -146,14 +147,30 @@ public class WhereOptimizer {
         RowKeySchema schema = table.getRowKeySchema();
         List<List<KeyRange>> cnf = Lists.newArrayListWithExpectedSize(schema.getMaxFields());
         KeyRange minMaxRange = keySlots.getMinMaxRange();
-        boolean hasMinMaxRange = (minMaxRange != null);
+        if (minMaxRange == null) {
+            minMaxRange = KeyRange.EVERYTHING_RANGE;
+        }
+        boolean hasMinMaxRange = (minMaxRange != KeyRange.EVERYTHING_RANGE);
         int minMaxRangeOffset = 0;
         byte[] minMaxRangePrefix = null;
+        boolean isSalted = nBuckets != null;
+        boolean isMultiTenant = tenantId != null && table.isMultiTenant();
+        boolean hasViewIndex = table.getViewIndexId() != null;
+        if (hasMinMaxRange) {
+            int minMaxRangeSize = (isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0)
+                    + (isMultiTenant ? tenantId.getBytes().length + 1 : 0) 
+                    + (hasViewIndex ? MetaDataUtil.getViewIndexIdDataType().getByteSize() : 0);
+            minMaxRangePrefix = new byte[minMaxRangeSize];
+        }
         
         Iterator<KeyExpressionVisitor.KeySlot> iterator = keySlots.iterator();
         // Add placeholder for salt byte ranges
-        if (nBuckets != null) {
+        if (isSalted) {
             cnf.add(SALT_PLACEHOLDER);
+            if (hasMinMaxRange) {
+	            System.arraycopy(SALT_PLACEHOLDER.get(0).getLowerRange(), 0, minMaxRangePrefix, minMaxRangeOffset, SaltingUtil.NUM_SALTING_BYTES);
+	            minMaxRangeOffset += SaltingUtil.NUM_SALTING_BYTES;
+            }
             // Increment the pkPos, as the salt column is in the row schema
             // Do not increment the iterator, though, as there will never be
             // an expression in the keySlots for the salt column
@@ -161,13 +178,12 @@ public class WhereOptimizer {
         }
         
         // Add tenant data isolation for tenant-specific tables
-        if (tenantId != null && table.isMultiTenant()) {
+        if (isMultiTenant) {
             byte[] tenantIdBytes = tenantId.getBytes();
             KeyRange tenantIdKeyRange = KeyRange.getKeyRange(tenantIdBytes);
             cnf.add(singletonList(tenantIdKeyRange));
             if (hasMinMaxRange) {
-                minMaxRangePrefix = new byte[tenantIdBytes.length + MetaDataUtil.getViewIndexIdDataType().getByteSize() + 1];
-                System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, 0, tenantIdBytes.length);
+                System.arraycopy(tenantIdBytes, 0, minMaxRangePrefix, minMaxRangeOffset, tenantIdBytes.length);
                 minMaxRangeOffset += tenantIdBytes.length;
                 if (!schema.getField(pkPos).getDataType().isFixedWidth()) {
                     minMaxRangePrefix[minMaxRangeOffset] = QueryConstants.SEPARATOR_BYTE;
@@ -178,14 +194,11 @@ public class WhereOptimizer {
         }
         // Add unique index ID for shared indexes on views. This ensures
         // that different indexes don't interleave.
-        if (table.getViewIndexId() != null) {
+        if (hasViewIndex) {
             byte[] viewIndexBytes = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
             KeyRange indexIdKeyRange = KeyRange.getKeyRange(viewIndexBytes);
             cnf.add(singletonList(indexIdKeyRange));
             if (hasMinMaxRange) {
-                if (minMaxRangePrefix == null) {
-                    minMaxRangePrefix = new byte[viewIndexBytes.length];
-                }
                 System.arraycopy(viewIndexBytes, 0, minMaxRangePrefix, minMaxRangeOffset, viewIndexBytes.length);
                 minMaxRangeOffset += viewIndexBytes.length;
             }
@@ -194,7 +207,7 @@ public class WhereOptimizer {
         
         // Prepend minMaxRange with fixed column values so we can properly intersect the
         // range with the other range.
-        if (minMaxRange != null) {
+        if (hasMinMaxRange) {
             minMaxRange = minMaxRange.prependRange(minMaxRangePrefix, 0, minMaxRangeOffset);
         }
         boolean forcedSkipScan = statement.getHint().hasHint(Hint.SKIP_SCAN);
@@ -285,9 +298,9 @@ public class WhereOptimizer {
         // If we have fully qualified point keys with multi-column spans (i.e. RVC),
         // we can still use our skip scan. The ScanRanges.create() call will explode
         // out the keys.
-        context.setScanRanges(
-                ScanRanges.create(schema, cnf, Arrays.copyOf(slotSpan, cnf.size()), forcedRangeScan, nBuckets),
-                minMaxRange);
+        slotSpan = Arrays.copyOf(slotSpan, cnf.size());
+        ScanRanges scanRanges = ScanRanges.create(schema, cnf, slotSpan, minMaxRange, forcedRangeScan, nBuckets);
+        context.setScanRanges(scanRanges);
         if (whereClause == null) {
             return null;
         } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index a1d943c..f1f05be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -110,7 +110,6 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.stat.PTableStats;
 import org.apache.phoenix.schema.stat.PTableStatsImpl;
-import org.apache.phoenix.schema.stat.StatisticsUtils;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -239,8 +238,14 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         int length = getVarCharLength(keyBuffer, keyOffset, keyLength);
         return PNameFactory.newName(keyBuffer, keyOffset, length);
     }
-    
-    private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp) throws IOException {
+
+    private static Scan newTableRowsScan(byte[] key)
+            throws IOException {
+        return newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP, HConstants.LATEST_TIMESTAMP);
+    }
+
+    private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp)
+            throws IOException {
         Scan scan = new Scan();
         scan.setTimeRange(startTimeStamp, stopTimeStamp);
         scan.setStartRow(key);
@@ -454,7 +459,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         }
         PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getTableName(
                 schemaName.getString(), tableName.getString())) : physicalTables.get(0);
-        PTableStats stats = updateStatsInternal(physicalTableName.getBytes(), columns);
+        PTableStats stats = tenantId == null ? updateStatsInternal(physicalTableName.getBytes()) : null;
         return PTableImpl
                 .makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName,
                         saltBucketNum, columns, tableType == INDEX ? dataTableName : null, indexes, isImmutableRows,
@@ -462,62 +467,48 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
                         viewIndexId, stats);
     }
     
-    private PTableStats updateStatsInternal(byte[] tableNameBytes, List<PColumn> columns) throws IOException {
-        List<PName> family = Lists.newArrayListWithExpectedSize(columns.size());
-        for (PColumn column : columns) {
-            PName familyName = column.getFamilyName();
-            if (familyName != null) {
-                family.add(familyName);
-            }
-        }
+    private PTableStats updateStatsInternal(byte[] tableNameBytes) throws IOException {
         HTable statsHTable = null;
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         try {
             // Can we do a new HTable instance here? Or get it from a pool or cache of these instances?
             statsHTable = new HTable(getEnvironment().getConfiguration(), PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
-            Scan s = new Scan();
-            if (tableNameBytes != null) {
-                // Check for an efficient way here
-                s.setStartRow(tableNameBytes);
-                s.setStopRow(ByteUtil.nextKey(tableNameBytes));
-            }
+            Scan s = newTableRowsScan(tableNameBytes);
+            s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
             ResultScanner scanner = statsHTable.getScanner(s);
             Result result = null;
-            byte[] fam = null;
-            List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(columns.size());
             TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
             while ((result = scanner.next()) != null) {
-                KeyValue[] kvs = result.raw();
-                for(KeyValue kv : kvs) {
-                    // For now collect only guide posts
-                    if (Bytes.equals(kv.getQualifier(), PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES)) {
-                        byte[] cfInCell = StatisticsUtils.getCFFromRowKey(tableNameBytes, kv.getRow());
-                        if (fam == null) {
-                            fam = cfInCell;
-                        } else if (!Bytes.equals(fam, cfInCell)) {
-                            // Sort all the guide posts
-                            guidePostsPerCf.put(cfInCell, guidePosts);
-                            guidePosts = new ArrayList<byte[]>();
-                            fam = cfInCell;
-                        }
-                        byte[] guidePostVal = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(),
-                                kv.getValueLength()).copyBytesIfNecessary();
-                        PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(guidePostVal);
-                        if (array != null && array.getDimensions() != 0) {
-                            for (int j = 0; j < array.getDimensions(); j++) {
-                                byte[] gp = array.toBytes(j);
-                                if (gp.length != 0) {
-                                    guidePosts.add(gp);
-                                }
-                            }
+                KeyValue current = result.raw()[0];
+                int tableNameLength = tableNameBytes.length + 1;
+                int cfOffset = current.getRowOffset() + tableNameLength;
+                int cfLength = getVarCharLength(current.getRow(), cfOffset, current.getRowLength() - tableNameLength);
+                ptr.set(current.getRow(), cfOffset, cfLength);
+                byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getValue(), current.getValueOffset(), current
+                        .getValueLength());
+                if (array != null && array.getDimensions() != 0) {
+                    List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions());                        
+                    for (int j = 0; j < array.getDimensions(); j++) {
+                        byte[] gp = array.toBytes(j);
+                        if (gp.length != 0) {
+                            guidePosts.add(gp);
                         }
                     }
+                    List<byte[]> gps = guidePostsPerCf.put(cfName, guidePosts);
+                    if (gps != null) { // Add guidepost already there from other regions
+                        guidePosts.addAll(gps);
+                    }
                 }
             }
-            if (fam != null) {
-                // Sort all the guideposts
-                guidePostsPerCf.put(fam, guidePosts);
+            if (!guidePostsPerCf.isEmpty()) {
+                // Sort guideposts, as the order above will depend on the order we traverse
+                // each region's worth of guideposts above.
+                for (List<byte[]> gps : guidePostsPerCf.values()) {
+                    Collections.sort(gps, Bytes.BYTES_COMPARATOR);
+                }
+                return new PTableStatsImpl(guidePostsPerCf);
             }
-            return new PTableStatsImpl(guidePostsPerCf);
         } catch (Exception e) {
             if (e instanceof org.apache.hadoop.hbase.TableNotFoundException) {
                 logger.warn("Stats table not yet online", e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index d45e036..9f294a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -65,7 +65,7 @@ import org.apache.phoenix.schema.TableRef;
  * 
  * @since 0.1
  */
-public class AggregatePlan extends BasicQueryPlan {
+public class AggregatePlan extends BaseQueryPlan {
     private final Aggregators aggregators;
     private final Expression having;
     private List<KeyRange> splits;
@@ -164,7 +164,7 @@ public class AggregatePlan extends BasicQueryPlan {
              */
             context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, PDataType.INTEGER.toBytes(limit));
         }
-        ParallelIterators parallelIterators = new ParallelIterators(context, tableRef, statement, projection, groupBy, null, wrapParallelIteratorFactory());
+        ParallelIterators parallelIterators = new ParallelIterators(this, null, wrapParallelIteratorFactory());
         splits = parallelIterators.getSplits();
 
         AggregatingResultIterator aggResultIterator;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
new file mode 100644
index 0000000..eb43aa4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.DelegateResultIterator;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ScanUtil;
+
+import com.google.common.collect.Lists;
+
+
+
+/**
+ *
+ * Query plan that has no child plans
+ *
+ * 
+ * @since 0.1
+ */
+public abstract class BaseQueryPlan implements QueryPlan {
+    protected static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K
+    
+    protected final TableRef tableRef;
+    protected final StatementContext context;
+    protected final FilterableStatement statement;
+    protected final RowProjector projection;
+    protected final ParameterMetaData paramMetaData;
+    protected final Integer limit;
+    protected final OrderBy orderBy;
+    protected final GroupBy groupBy;
+    protected final ParallelIteratorFactory parallelIteratorFactory;
+
+    protected BaseQueryPlan(
+            StatementContext context, FilterableStatement statement, TableRef table,
+            RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy,
+            GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) {
+        this.context = context;
+        this.statement = statement;
+        this.tableRef = table;
+        this.projection = projection;
+        this.paramMetaData = paramMetaData;
+        this.limit = limit;
+        this.orderBy = orderBy;
+        this.groupBy = groupBy;
+        this.parallelIteratorFactory = parallelIteratorFactory;
+    }
+
+    @Override
+    public boolean isDegenerate() {
+        return context.getScanRanges() == ScanRanges.NOTHING;
+
+    }
+    
+    @Override
+    public GroupBy getGroupBy() {
+        return groupBy;
+    }
+
+    
+    @Override
+    public OrderBy getOrderBy() {
+        return orderBy;
+    }
+
+    @Override
+    public TableRef getTableRef() {
+        return tableRef;
+    }
+
+    @Override
+    public Integer getLimit() {
+        return limit;
+    }
+
+    @Override
+    public RowProjector getProjector() {
+        return projection;
+    }
+
+//    /**
+//     * Sets up an id used to do round robin queue processing on the server
+//     * @param scan
+//     */
+//    private void setProducer(Scan scan) {
+//        byte[] producer = Bytes.toBytes(UUID.randomUUID().toString());
+//        scan.setAttribute(HBaseServer.CALL_QUEUE_PRODUCER_ATTRIB_NAME, producer);
+//    }
+    
+    @Override
+    public final ResultIterator iterator() throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList());
+    }
+
+    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException {
+        if (context.getScanRanges() == ScanRanges.NOTHING) {
+            return ResultIterator.EMPTY_ITERATOR;
+        }
+        
+        Scan scan = context.getScan();
+        // Set producer on scan so HBase server does round robin processing
+        //setProducer(scan);
+        // Set the time range on the scan so we don't get back rows newer than when the statement was compiled
+        // The time stamp comes from the server at compile time when the meta data
+        // is resolved.
+        // TODO: include time range in explain plan?
+        PhoenixConnection connection = context.getConnection();
+        if (context.getScanTimeRange() == null) {
+          Long scn = connection.getSCN();
+          if (scn == null) {
+            scn = context.getCurrentTime();
+            // Add one to server time since max of time range is exclusive
+            // and we need to account of OSs with lower resolution clocks.
+            if (scn < HConstants.LATEST_TIMESTAMP) {
+              scn++;
+            }
+          }
+          ScanUtil.setTimeRange(scan, scn);
+        } else {
+          try {
+            scan.setTimeRange(context.getScanTimeRange().getMin(), context.getScanTimeRange().getMax());
+          } catch (IOException e) {
+            throw new SQLException(e);
+          }
+        }
+        ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes());
+        ResultIterator iterator = newIterator();
+        return dependencies.isEmpty() ? 
+                iterator : new DelegateResultIterator(iterator) {
+            @Override
+            public void close() throws SQLException {
+                try {
+                    super.close();
+                } finally {
+                    SQLCloseables.closeAll(dependencies);
+                }
+            }
+        };
+    }
+
+    abstract protected ResultIterator newIterator() throws SQLException;
+    
+    @Override
+    public long getEstimatedSize() {
+        return DEFAULT_ESTIMATED_SIZE;
+    }
+
+    @Override
+    public ParameterMetaData getParameterMetaData() {
+        return paramMetaData;
+    }
+
+    @Override
+    public FilterableStatement getStatement() {
+        return statement;
+    }
+
+    @Override
+    public StatementContext getContext() {
+        return context;
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        if (context.getScanRanges() == ScanRanges.NOTHING) {
+            return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + tableRef.getTable().getName().getString()));
+        }
+        
+        // Optimize here when getting explain plan, as queries don't get optimized until after compilation
+        QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
+        ResultIterator iterator = plan.iterator();
+        List<String> planSteps = Lists.newArrayListWithExpectedSize(5);
+        iterator.explain(planSteps);
+        return new ExplainPlan(planSteps);
+    }
+
+    @Override
+    public boolean isRowKeyOrdered() {
+        return groupBy.isEmpty() ? orderBy.getOrderByExpressions().isEmpty() : groupBy.isOrderPreserving();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
deleted file mode 100644
index 1aa3892..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import java.io.IOException;
-import java.sql.ParameterMetaData;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.phoenix.compile.ExplainPlan;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.DelegateResultIterator;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.SQLCloseable;
-import org.apache.phoenix.util.SQLCloseables;
-import org.apache.phoenix.util.ScanUtil;
-
-import com.google.common.collect.Lists;
-
-
-
-/**
- *
- * Query plan that has no child plans
- *
- * 
- * @since 0.1
- */
-public abstract class BasicQueryPlan implements QueryPlan {
-    protected static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K
-    
-    protected final TableRef tableRef;
-    protected final StatementContext context;
-    protected final FilterableStatement statement;
-    protected final RowProjector projection;
-    protected final ParameterMetaData paramMetaData;
-    protected final Integer limit;
-    protected final OrderBy orderBy;
-    protected final GroupBy groupBy;
-    protected final ParallelIteratorFactory parallelIteratorFactory;
-
-    protected BasicQueryPlan(
-            StatementContext context, FilterableStatement statement, TableRef table,
-            RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy,
-            GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) {
-        this.context = context;
-        this.statement = statement;
-        this.tableRef = table;
-        this.projection = projection;
-        this.paramMetaData = paramMetaData;
-        this.limit = limit;
-        this.orderBy = orderBy;
-        this.groupBy = groupBy;
-        this.parallelIteratorFactory = parallelIteratorFactory;
-    }
-
-    @Override
-    public boolean isDegenerate() {
-        return context.getScanRanges() == ScanRanges.NOTHING;
-
-    }
-    
-    @Override
-    public GroupBy getGroupBy() {
-        return groupBy;
-    }
-
-    
-    @Override
-    public OrderBy getOrderBy() {
-        return orderBy;
-    }
-
-    @Override
-    public TableRef getTableRef() {
-        return tableRef;
-    }
-
-    @Override
-    public Integer getLimit() {
-        return limit;
-    }
-
-    @Override
-    public RowProjector getProjector() {
-        return projection;
-    }
-
-//    /**
-//     * Sets up an id used to do round robin queue processing on the server
-//     * @param scan
-//     */
-//    private void setProducer(Scan scan) {
-//        byte[] producer = Bytes.toBytes(UUID.randomUUID().toString());
-//        scan.setAttribute(HBaseServer.CALL_QUEUE_PRODUCER_ATTRIB_NAME, producer);
-//    }
-    
-    @Override
-    public final ResultIterator iterator() throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList());
-    }
-
-    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException {
-        if (context.getScanRanges() == ScanRanges.NOTHING) {
-            return ResultIterator.EMPTY_ITERATOR;
-        }
-        
-        Scan scan = context.getScan();
-        // Set producer on scan so HBase server does round robin processing
-        //setProducer(scan);
-        // Set the time range on the scan so we don't get back rows newer than when the statement was compiled
-        // The time stamp comes from the server at compile time when the meta data
-        // is resolved.
-        // TODO: include time range in explain plan?
-        PhoenixConnection connection = context.getConnection();
-        if (context.getScanTimeRange() == null) {
-          Long scn = connection.getSCN();
-          if (scn == null) {
-            scn = context.getCurrentTime();
-            // Add one to server time since max of time range is exclusive
-            // and we need to account of OSs with lower resolution clocks.
-            if (scn < HConstants.LATEST_TIMESTAMP) {
-              scn++;
-            }
-          }
-          ScanUtil.setTimeRange(scan, scn);
-        } else {
-          try {
-            scan.setTimeRange(context.getScanTimeRange().getMin(), context.getScanTimeRange().getMax());
-          } catch (IOException e) {
-            throw new SQLException(e);
-          }
-        }
-        ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes());
-        ResultIterator iterator = newIterator();
-        return dependencies.isEmpty() ? 
-                iterator : new DelegateResultIterator(iterator) {
-            @Override
-            public void close() throws SQLException {
-                try {
-                    super.close();
-                } finally {
-                    SQLCloseables.closeAll(dependencies);
-                }
-            }
-        };
-    }
-
-    abstract protected ResultIterator newIterator() throws SQLException;
-    
-    @Override
-    public long getEstimatedSize() {
-        return DEFAULT_ESTIMATED_SIZE;
-    }
-
-    @Override
-    public ParameterMetaData getParameterMetaData() {
-        return paramMetaData;
-    }
-
-    @Override
-    public FilterableStatement getStatement() {
-        return statement;
-    }
-
-    @Override
-    public StatementContext getContext() {
-        return context;
-    }
-
-    @Override
-    public ExplainPlan getExplainPlan() throws SQLException {
-        if (context.getScanRanges() == ScanRanges.NOTHING) {
-            return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + tableRef.getTable().getName().getString()));
-        }
-        
-        // Optimize here when getting explain plan, as queries don't get optimized until after compilation
-        QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
-        ResultIterator iterator = plan.iterator();
-        List<String> planSteps = Lists.newArrayListWithExpectedSize(5);
-        iterator.explain(planSteps);
-        return new ExplainPlan(planSteps);
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 0e8eb50..80c4727 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -30,7 +30,7 @@ import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.*;
 import org.apache.phoenix.schema.TableRef;
 
-public class DegenerateQueryPlan extends BasicQueryPlan {
+public class DegenerateQueryPlan extends BaseQueryPlan {
 
     public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) {
         super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 66ad235..5ffcaeb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -79,7 +79,7 @@ public class HashJoinPlan implements QueryPlan {
     private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
 
     private final FilterableStatement statement;
-    private final BasicQueryPlan plan;
+    private final BaseQueryPlan plan;
     private final HashJoinInfo joinInfo;
     private final SubPlan[] subPlans;
     private final boolean recompileWhereClause;
@@ -93,8 +93,8 @@ public class HashJoinPlan implements QueryPlan {
     
     public static HashJoinPlan create(FilterableStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) {
-        if (plan instanceof BasicQueryPlan)
-            return new HashJoinPlan(statement, (BasicQueryPlan) plan, joinInfo, subPlans, joinInfo == null);
+        if (plan instanceof BaseQueryPlan)
+            return new HashJoinPlan(statement, (BaseQueryPlan) plan, joinInfo, subPlans, joinInfo == null);
         
         assert (plan instanceof HashJoinPlan);
         HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
@@ -111,7 +111,7 @@ public class HashJoinPlan implements QueryPlan {
     }
     
     private HashJoinPlan(FilterableStatement statement, 
-            BasicQueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) {
+            BaseQueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) {
         this.statement = statement;
         this.plan = plan;
         this.joinInfo = joinInfo;
@@ -462,6 +462,11 @@ public class HashJoinPlan implements QueryPlan {
         }
         
     }
+
+    @Override
+    public boolean isRowKeyOrdered() {
+        return plan.isRowKeyOrdered();
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 03deca7..dfb8fec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -56,12 +56,12 @@ import org.apache.phoenix.util.ScanUtil;
  *
  * @since 0.1
  */
-public class ScanPlan extends BasicQueryPlan {
+public class ScanPlan extends BaseQueryPlan {
     private List<KeyRange> splits;
     private boolean allowPageFilter;
 
     public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) {
-        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, null,
+        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
                         buildResultIteratorFactory(context, table, orderBy));
         this.allowPageFilter = allowPageFilter;
@@ -110,7 +110,7 @@ public class ScanPlan extends BasicQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        ParallelIterators iterators = new ParallelIterators(context, tableRef, statement, projection, GroupBy.EMPTY_GROUP_BY, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
+        ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
         splits = iterators.getSplits();
         if (isOrdered) {
             scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
index 1bcfcd0..adeb2ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
@@ -105,7 +105,7 @@ public class SkipScanFilter extends FilterBase {
     }
 
     // Exposed for testing.
-    List<List<KeyRange>> getSlots() {
+    public List<List<KeyRange>> getSlots() {
         return slots;
     }
 
@@ -178,14 +178,35 @@ public class SkipScanFilter extends FilterBase {
             schema.next(ptr, 0, schema.iterator(upperExclusiveKey,ptr), slotSpan[0]);
             endPos = ScanUtil.searchClosestKeyRangeWithUpperHigherThanPtr(slots.get(0), ptr, startPos);
             // Upper range lower than first lower range of first slot, so cannot possibly be in range
-            if (endPos == 0 && Bytes.compareTo(upperExclusiveKey, slots.get(0).get(0).getLowerRange()) <= 0) {
-                return false;
-            }
+//            if (endPos == 0 && Bytes.compareTo(upperExclusiveKey, slots.get(0).get(0).getLowerRange()) <= 0) {
+//                return false;
+//            }
             // Past last position, so we can include everything from the start position
             if (endPos >= slots.get(0).size()) {
                 upperUnbound = true;
                 endPos = slots.get(0).size()-1;
+            } else if (slots.get(0).get(endPos).compareLowerToUpperBound(upperExclusiveKey) >= 0) {
+                // We know that the endPos range is higher than the previous range, but we need
+                // to test if it ends before the next range starts.
+                endPos--;
+            }
+            if (endPos < startPos) {
+                return false;
+            }
+            
+        }
+        // Short circuit out if we only have a single set of keys
+        if (slots.size() == 1) {
+//            int offset = slots.get(0).get(endPos).compareLowerToUpperBound(upperExclusiveKey) < 0 ? 1 : 0;
+//            if (endPos + offset <= startPos) {
+//                return false;
+//            }
+//            List<KeyRange> newRanges = slots.get(0).subList(startPos, endPos + offset);
+            if (newSlots != null) {
+                List<KeyRange> newRanges = slots.get(0).subList(startPos, endPos+1);
+                newSlots.add(newRanges);
             }
+            return true;
         }
         if (!lowerUnbound) {
             position[0] = startPos;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index f486a47..8822e49 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -59,7 +59,6 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
         }
         Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
         ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-        scanRanges.setScanStartStopRow(scan);
         scan.setFilter(scanRanges.getSkipScanFilter());
         HRegion region = this.env.getRegion();
         RegionScanner scanner = region.getScanner(scan);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88c6abb0/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
deleted file mode 100644
index b8135d1..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.iterate;
-
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-
-/**
- * Default strategy for splitting regions in ParallelIterator. Refactored from the
- * original version.
- * 
- * 
- * 
- */
-public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRegionSplitter {
-
-    protected final long guidePostsDepth;
-    protected final StatementContext context;
-    protected final PTable table;
-
-    private static final Logger logger = LoggerFactory.getLogger(DefaultParallelIteratorRegionSplitter.class);
-    public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, PTable table, HintNode hintNode) {
-        return new DefaultParallelIteratorRegionSplitter(context, table, hintNode);
-    }
-
-    protected DefaultParallelIteratorRegionSplitter(StatementContext context, PTable table, HintNode hintNode) {
-        this.context = context;
-        this.table = table;
-        ReadOnlyProps props = context.getConnection().getQueryServices().getProps();
-        this.guidePostsDepth = props.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
-                QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
-    }
-
-    // Get the mapping between key range and the regions that contains them.
-    protected List<HRegionLocation> getAllRegions() throws SQLException {
-        Scan scan = context.getScan();
-        List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices()
-                .getAllTableRegions(table.getPhysicalName().getBytes());
-        // If we're not salting, then we've already intersected the minMaxRange with the scan range
-        // so there's nothing to do here.
-        return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow());
-    }
-
-    /**
-     * Filters out regions that intersect with key range specified by the startKey and stopKey
-     * @param allTableRegions all region infos for a given table
-     * @param startKey the lower bound of key range, inclusive
-     * @param stopKey the upper bound of key range, inclusive
-     * @return regions that intersect with the key range given by the startKey and stopKey
-     */
-    // exposed for tests
-    public static List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, byte[] startKey, byte[] stopKey) {
-        Iterable<HRegionLocation> regions;
-        final KeyRange keyRange = KeyRange.getKeyRange(startKey, true, stopKey, false);
-        if (keyRange == KeyRange.EVERYTHING_RANGE) {
-            return allTableRegions;
-        }
-        
-        regions = Iterables.filter(allTableRegions, new Predicate<HRegionLocation>() {
-            @Override
-            public boolean apply(HRegionLocation location) {
-                KeyRange regionKeyRange = KeyRange.getKeyRange(location.getRegionInfo().getStartKey(), location
-                        .getRegionInfo().getEndKey());
-                return keyRange.intersect(regionKeyRange) != KeyRange.EMPTY_RANGE;
-            }
-        });
-        return Lists.newArrayList(regions);
-    }
-
-    protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) {
-        if (regions.isEmpty()) { return Collections.emptyList(); }
-        Scan scan = context.getScan();
-        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
-        List<byte[]> gps = Lists.newArrayList();
-        if (!ScanUtil.isAnalyzeTable(scan)) {
-            if (table.getColumnFamilies().isEmpty()) {
-                // For sure we can get the defaultCF from the table
-                gps = table.getGuidePosts();
-            } else {
-                try {
-                    if (scan.getFamilyMap().size() > 0) {
-                        if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
-                            gps = table.getColumnFamily(defaultCF).getGuidePosts();
-                        } else { // Otherwise, just use first CF in use by scan
-                            gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
-                        }
-                    } else {
-                        gps = table.getColumnFamily(defaultCF).getGuidePosts();
-                    }
-                } catch (ColumnFamilyNotFoundException cfne) {
-                    // Alter table does this
-                }
-            }
-
-        }
-
-        List<KeyRange> guidePosts = Lists.newArrayListWithCapacity(regions.size());
-        byte[] currentKey = regions.get(0).getRegionInfo().getStartKey();
-        byte[] endKey = null;
-        int regionIndex = 0;
-        int guideIndex = 0;
-        int gpsSize = gps.size();
-        int regionSize = regions.size();
-        if (currentKey.length > 0) {
-            guideIndex = Collections.binarySearch(gps, currentKey, Bytes.BYTES_COMPARATOR);
-            guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
-        }
-        // Merge bisect with guideposts for all but the last region
-        while (regionIndex < regionSize) {
-            byte[] currentGuidePost;
-            currentKey = regions.get(regionIndex).getRegionInfo().getStartKey();
-            endKey = regions.get(regionIndex++).getRegionInfo().getEndKey();
-            while (guideIndex < gpsSize
-                    && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
-                KeyRange keyRange = KeyRange.getKeyRange(currentKey, currentGuidePost);
-                if (keyRange != KeyRange.EMPTY_RANGE) {
-                    guidePosts.add(keyRange);
-                }
-                currentKey = currentGuidePost;
-                guideIndex++;
-            }
-            KeyRange keyRange = KeyRange.getKeyRange(currentKey, endKey);
-            if (keyRange != KeyRange.EMPTY_RANGE) {
-                guidePosts.add(keyRange);
-            }
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("The captured guideposts are: " + guidePosts);
-        }
-        return guidePosts;
-    }
-
-    @Override
-    public List<KeyRange> getSplits() throws SQLException {
-        return genKeyRanges(getAllRegions());
-    }
-}