You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/05/04 13:20:58 UTC

[1/3] phoenix git commit: PHOENIX-2628 Ensure split when iterating through results handled correctly(Rajeshbabu)

Repository: phoenix
Updated Branches:
  refs/heads/master 99713a61c -> d700c1f03


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 696f051..f9e9913 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -20,6 +20,9 @@ package org.apache.phoenix.util;
 import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
 import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -32,6 +35,7 @@ import java.util.NavigableSet;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -103,6 +107,10 @@ public class ScanUtil {
         return scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX) != null;
     }
 
+    public static boolean isNonAggregateScan(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
+    }
+
     // Use getTenantId and pass in column name to match against
     // in as PSchema attribute. If column name matches in 
     // KeyExpressions, set on scan as attribute
@@ -618,6 +626,62 @@ public class ScanUtil {
         }
     }
 
+    /**
+     * prefix region start key to the start row/stop row suffix and set as scan boundaries.
+     * @param scan
+     * @param lowerInclusiveRegionKey
+     * @param upperExclusiveRegionKey
+     */
+    public static void setupLocalIndexScan(Scan scan, byte[] lowerInclusiveRegionKey,
+            byte[] upperExclusiveRegionKey) {
+        byte[] prefix = lowerInclusiveRegionKey.length == 0 ? new byte[upperExclusiveRegionKey.length]: lowerInclusiveRegionKey;
+        int prefixLength = lowerInclusiveRegionKey.length == 0? upperExclusiveRegionKey.length: lowerInclusiveRegionKey.length;
+        if(scan.getAttribute(SCAN_START_ROW_SUFFIX)!=null) {
+            scan.setStartRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_START_ROW_SUFFIX), 0, prefix, prefixLength));
+        }
+        if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX)!=null) {
+            scan.setStopRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_STOP_ROW_SUFFIX), 0, prefix, prefixLength));
+        }
+    }
+
+    public static byte[] getActualStartRow(Scan localIndexScan, HRegionInfo regionInfo) {
+        return localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX) == null ? localIndexScan
+                .getStartRow() : ScanRanges.prefixKey(localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX), 0 ,
+            regionInfo.getStartKey().length == 0 ? new byte[regionInfo.getEndKey().length]
+                    : regionInfo.getStartKey(),
+            regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo
+                    .getStartKey().length);
+    }
+
+    /**
+     * Set all attributes required and boundaries for local index scan.
+     * @param keyOffset
+     * @param regionStartKey
+     * @param regionEndKey
+     * @param newScan
+     */
+    public static void setLocalIndexAttributes(Scan newScan, int keyOffset, byte[] regionStartKey, byte[] regionEndKey, byte[] startRowSuffix, byte[] stopRowSuffix) {
+        if(ScanUtil.isLocalIndex(newScan)) {
+             newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionStartKey);
+             newScan.setStartRow(regionStartKey);
+             newScan.setStopRow(regionEndKey);
+             if (keyOffset > 0 ) {
+                 newScan.setAttribute(SCAN_START_ROW_SUFFIX, ScanRanges.stripPrefix(startRowSuffix, keyOffset));
+             } else {
+                 newScan.setAttribute(SCAN_START_ROW_SUFFIX, startRowSuffix);
+             }
+             if (keyOffset > 0) {
+                 newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, ScanRanges.stripPrefix(stopRowSuffix, keyOffset));
+             } else {
+                 newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, stopRowSuffix);
+             }
+         }
+    }
+
+    public static boolean isConextScan(Scan scan, StatementContext context) {
+        return Bytes.compareTo(context.getScan().getStartRow(), scan.getStartRow()) == 0 && Bytes
+                .compareTo(context.getScan().getStopRow(), scan.getStopRow()) == 0;
+    }
     public static int getRowKeyOffset(byte[] regionStartKey, byte[] regionEndKey) {
         return regionStartKey.length > 0 ? regionStartKey.length : regionEndKey.length;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index d61e9fe..c021b2c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -752,7 +752,7 @@ public abstract class BaseTest {
         conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, IndexLoadBalancer.class,
             LoadBalancer.class);
         conf.setClass("hbase.coprocessor.regionserver.classes", LocalIndexMerger.class,
-            RegionServerObserver.class);
+            RegionServerObserver.class) ;
         conf.setInt("dfs.namenode.handler.count", 2);
         conf.setInt("dfs.namenode.service.handler.count", 2);
         conf.setInt("dfs.datanode.handler.count", 2);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 47101b2..1da68ba 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -386,7 +386,12 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
             public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
                 return ResultIterator.EMPTY_ITERATOR;
             }
-            
+
+            @Override
+            public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+                return ResultIterator.EMPTY_ITERATOR;
+            }
+
             @Override
             public ResultIterator iterator() throws SQLException {
                 return ResultIterator.EMPTY_ITERATOR;
@@ -467,7 +472,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
                 return false;
             }
             
-        }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+        }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan());
         List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
index f245840..5cdf234 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.hive.PhoenixRowKey;
 import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -115,8 +116,8 @@ public class PhoenixRecordReader<T extends DBWritable> implements
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes
                         .toBytes(true));
                 final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan
-                        .getContext().getConnection().getMutationState(), queryPlan.getTableRef(), scan,
-                        readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold);
+                        .getContext().getConnection().getMutationState(), scan,
+                        readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance()	);
 
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap
                         (tableResultIterator);


[2/3] phoenix git commit: PHOENIX-2628 Ensure split when iterating through results handled correctly(Rajeshbabu)

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 8848efc..4c6b960 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -85,7 +87,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema";
     public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin";
     public static final String VIEW_CONSTANTS = "_ViewConstants";
-    public static final String STARTKEY_OFFSET = "_StartKeyOffset";
     public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
     public static final String REVERSE_SCAN = "_ReverseScan";
     public static final String ANALYZE_TABLE = "_ANALYZETABLE";
@@ -100,6 +101,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
     public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS";
     public final static String SCAN_OFFSET = "_RowOffset";
+    public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix";
+    public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix";
     
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
@@ -135,7 +138,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
         byte[] upperExclusiveRegionKey = region.getRegionInfo().getEndKey();
         boolean isStaleRegionBoundaries;
         if (isLocalIndex) {
-            byte[] expectedUpperRegionKey = scan.getAttribute(EXPECTED_UPPER_REGION_KEY);
+            byte[] expectedUpperRegionKey =
+                    scan.getAttribute(EXPECTED_UPPER_REGION_KEY) == null ? scan.getStopRow() : scan
+                            .getAttribute(EXPECTED_UPPER_REGION_KEY);
             isStaleRegionBoundaries = expectedUpperRegionKey != null &&
                     Bytes.compareTo(upperExclusiveRegionKey, expectedUpperRegionKey) != 0;
         } else {
@@ -147,6 +152,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTable().getNameAsString());
             throw new DoNotRetryIOException(cause.getMessage(), cause);
         }
+        if(isLocalIndex) {
+            ScanUtil.setupLocalIndexScan(scan, lowerInclusiveRegionKey, upperExclusiveRegionKey);
+        }
     }
 
     abstract protected boolean isRegionObserverFor(Scan scan);
@@ -166,7 +174,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
         }
         if (isRegionObserverFor(scan)) {
-            if (! skipRegionBoundaryCheck(scan)) {
+            // For local indexes, we need to throw if out of region as we'll get inconsistent
+            // results otherwise while in other cases, it may just mean out client-side data
+            // on region boundaries is out of date and can safely be ignored.
+            if (!skipRegionBoundaryCheck(scan) || ScanUtil.isLocalIndex(scan)) {
                 throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion());
             }
             // Muck with the start/stop row of the scan and set as reversed at the
@@ -227,6 +238,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                 }
             }
         } catch (Throwable t) {
+            // If the exception is NotServingRegionException then throw it as
+            // StaleRegionBoundaryCacheException to handle it by phoenix client other wise hbase
+            // client may recreate scans with wrong region boundaries.
+            if(t instanceof NotServingRegionException) {
+                Exception cause = new StaleRegionBoundaryCacheException(c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+                throw new DoNotRetryIOException(cause.getMessage(), cause);
+            }
             ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t);
             return null; // impossible
         }
@@ -280,6 +298,31 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             final ImmutableBytesWritable ptr) {
         return new RegionScanner() {
 
+            private boolean hasReferences = checkForReferenceFiles();
+            private HRegionInfo regionInfo = c.getEnvironment().getRegionInfo();
+            private byte[] actualStartKey = getActualStartKey();
+
+            // If there are any reference files after local index region merge some cases we might
+            // get the records less than scan start row key. This will happen when we replace the
+            // actual region start key with merge region start key. This method gives whether are
+            // there any reference files in the region or not.
+            private boolean checkForReferenceFiles() {
+                if(!ScanUtil.isLocalIndex(scan)) return false;
+                for (byte[] family : scan.getFamilies()) {
+                    if (c.getEnvironment().getRegion().getStore(family).hasReferences()) {
+                        return true;
+                    }
+                }
+                return false;
+            }
+
+            // Get the actual scan start row of local index. This will be used to compare the row
+            // key of the results less than scan start row when there are references.
+            public byte[] getActualStartKey() {
+                return ScanUtil.isLocalIndex(scan) ? ScanUtil.getActualStartRow(scan, regionInfo)
+                        : null;
+            }
+
             @Override
             public boolean next(List<Cell> results) throws IOException {
                 try {
@@ -338,6 +381,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                         arrayElementCell = result.get(arrayElementCellPosition);
                     }
                     if (ScanUtil.isLocalIndex(scan) && !ScanUtil.isAnalyzeTable(scan)) {
+                        if(hasReferences && actualStartKey!=null) {
+                            next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
+                                null, arrayElementCell);
+                            if (result.isEmpty()) {
+                                return next;
+                            }
+                        }
                         IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns,
                             tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
                     }
@@ -370,6 +420,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                     arrayElementCell = result.get(arrayElementCellPosition);
                 }
                 if ((offset > 0 || ScanUtil.isLocalIndex(scan))  && !ScanUtil.isAnalyzeTable(scan)) {
+                    if(hasReferences && actualStartKey!=null) {
+                        next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
+                                    scannerContext, arrayElementCell);
+                        if (result.isEmpty()) {
+                            return next;
+                        }
+                    }
                     IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns,
                         tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
                 }
@@ -388,6 +445,37 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
               }
             }
 
+            /**
+             * When there is a merge in progress while scanning local indexes we might get the key values less than scan start row.
+             * In that case we need to scan until get the row key more or  equal to scan start key.
+             * TODO try to fix this case in LocalIndexStoreFileScanner when there is a merge.
+             */
+            private boolean scanTillScanStartRow(final RegionScanner s,
+                    final Set<KeyValueColumnExpression> arrayKVRefs,
+                    final Expression[] arrayFuncRefs, List<Cell> result,
+                    ScannerContext scannerContext, Cell arrayElementCell) throws IOException {
+                boolean next = true;
+                Cell firstCell = result.get(0);
+                while (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
+                    firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0) {
+                    result.clear();
+                    if(scannerContext == null) {
+                        next = s.nextRaw(result);
+                    } else {
+                        next = s.nextRaw(result, scannerContext);
+                    }
+                    if (result.isEmpty()) {
+                        return next;
+                    }
+                    if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
+                        int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
+                        arrayElementCell = result.get(arrayElementCellPosition);
+                    }
+                    firstCell = result.get(0);
+                }
+                return next;
+            }
+
             private int replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs,
                     final Expression[] arrayFuncRefs, List<Cell> result) {
                 // make a copy of the results array here, as we're modifying it below

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 3237882..3b8efc3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -39,7 +39,6 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -64,6 +63,7 @@ import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.Closeables;
@@ -402,8 +402,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             }
 
             Region region = c.getEnvironment().getRegion();
-            region.startRegionOperation();
+            boolean acquiredLock = false;
             try {
+                region.startRegionOperation();
+                acquiredLock = true;
                 synchronized (scanner) {
                     do {
                         List<Cell> results = new ArrayList<Cell>();
@@ -423,8 +425,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                         }
                     } while (hasMore && groupByCache.size() < limit);
                 }
-            } finally {
-                region.closeRegionOperation();
+            }  finally {
+                if (acquiredLock) region.closeRegionOperation();
             }
 
             RegionScanner regionScanner = groupByCache.getScanner(scanner);
@@ -472,8 +474,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                 // start of a new row. Otherwise, we have to wait until an agg
                 int countOffset = rowAggregators.length == 0 ? 1 : 0;
                 Region region = c.getEnvironment().getRegion();
-                region.startRegionOperation();
+                boolean acquiredLock = false;
                 try {
+                    region.startRegionOperation();
+                    acquiredLock = true;
                     synchronized (scanner) {
                         do {
                             List<Cell> kvs = new ArrayList<Cell>();
@@ -505,7 +509,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                         } while (hasMore && !aggBoundary && !atLimit);
                     }
                 } finally {
-                    region.closeRegionOperation();
+                    if (acquiredLock) region.closeRegionOperation();
                 }
 
                 if (currentKey != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 77b8b3e..8616b6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -86,6 +86,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
@@ -288,8 +289,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
         long rowCount = 0;
         final RegionScanner innerScanner = theScanner;
-        region.startRegionOperation();
+        boolean acquiredLock = false;
         try {
+            region.startRegionOperation();
+            acquiredLock = true;
             synchronized (innerScanner) {
                 do {
                     List<Cell> results = new ArrayList<Cell>();
@@ -529,7 +532,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             try {
                 innerScanner.close();
             } finally {
-                region.closeRegionOperation();
+                if (acquiredLock) region.closeRegionOperation();
             }
         }
         if (logger.isDebugEnabled()) {
@@ -608,7 +611,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         InternalScanner internalScanner = scanner;
         if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
             try {
-                Pair<HRegionInfo, HRegionInfo> mergeRegions = null;
                 long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
                 StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector(
                         c.getEnvironment(), table.getNameAsString(), clientTimeStamp,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index b60cd7e..94d1fc8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -126,7 +127,7 @@ public class AggregatePlan extends BaseQueryPlan {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException {
             Expression expression = RowKeyExpression.INSTANCE;
             OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
             int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -143,9 +144,9 @@ public class AggregatePlan extends BaseQueryPlan {
             this.outerFactory = outerFactory;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
-            PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName);
-            return outerFactory.newIterator(context, iterator, scan, tableName);
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException {
+            PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName, plan);
+            return outerFactory.newIterator(context, iterator, scan, tableName, plan);
         }
     }
 
@@ -169,12 +170,12 @@ public class AggregatePlan extends BaseQueryPlan {
     }
     
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         if (groupBy.isEmpty()) {
-            UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan());
+            UngroupedAggregateRegionObserver.serializeIntoScan(scan);
         } else {
             // Set attribute with serialized expressions for coprocessor
-            GroupedAggregateRegionObserver.serializeIntoScan(context.getScan(), groupBy.getScanAttribName(), groupBy.getKeyExpressions());
+            GroupedAggregateRegionObserver.serializeIntoScan(scan, groupBy.getScanAttribName(), groupBy.getKeyExpressions());
             if (limit != null && orderBy.getOrderByExpressions().isEmpty() && having == null
                     && (  (   statement.isDistinct() && ! statement.isAggregate() )
                             || ( ! statement.isDistinct() && (   context.getAggregationManager().isEmpty()
@@ -200,7 +201,7 @@ public class AggregatePlan extends BaseQueryPlan {
                  *    order, so we can early exit, even when aggregate functions are used, as
                  *    the rows in the group are contiguous.
                  */
-                context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT,
+                scan.setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT,
                         PInteger.INSTANCE.toBytes(limit + (offset == null ? 0 : offset)));
             }
         }
@@ -211,8 +212,8 @@ public class AggregatePlan extends BaseQueryPlan {
             logger.warn("This query cannot be executed serially. Ignoring the hint");
         }
         BaseResultIterators iterators = hasSerialHint && canBeExecutedSerially
-                ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper)
-                : new ParallelIterators(this, null, wrapParallelIteratorFactory());
+                ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan)
+                : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan);
 
         splits = iterators.getSplits();
         scans = iterators.getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index cedd23e..83e55ee 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -203,26 +203,30 @@ public abstract class BaseQueryPlan implements QueryPlan {
     
     @Override
     public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper);
+        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, this.context.getScan());
     }
-    
+
+    @Override
+    public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, scan);
+    }
+
     @Override
     public final ResultIterator iterator() throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance());
+        return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance(), this.context.getScan());
     }
 
-    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper) throws SQLException {
+    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         if (context.getScanRanges() == ScanRanges.NOTHING) {
             return ResultIterator.EMPTY_ITERATOR;
         }
         
         if (tableRef == TableRef.EMPTY_TABLE_REF) {
-            return newIterator(scanGrouper);
+            return newIterator(scanGrouper, scan);
         }
         
         // Set miscellaneous scan attributes. This is the last chance to set them before we
         // clone the scan for each parallelized chunk.
-        Scan scan = context.getScan();
         TableRef tableRef = context.getCurrentTable();
         PTable table = tableRef.getTable();
         
@@ -319,7 +323,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         	LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection));
         }
         
-        ResultIterator iterator = newIterator(scanGrouper);
+        ResultIterator iterator = newIterator(scanGrouper, scan);
         iterator = dependencies.isEmpty() ?
                 iterator : new DelegateResultIterator(iterator) {
             @Override
@@ -448,7 +452,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
     }
 
-    abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException;
+    abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
     
     @Override
     public long getEstimatedSize() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index f4e374e..eb048f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -83,6 +84,11 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        return iterator(scanGrouper, null);
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         ResultIterator iterator = delegate.iterator(scanGrouper);
         if (where != null) {
             iterator = new FilterResultIterator(iterator, where);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 003c995..4e43225 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
@@ -50,7 +51,11 @@ public class ClientScanPlan extends ClientProcessingPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
-        ResultIterator iterator = delegate.iterator(scanGrouper);
+        return iterator(scanGrouper, delegate.getContext().getScan());
+    }
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+        ResultIterator iterator = delegate.iterator(scanGrouper, scan);
         if (where != null) {
             iterator = new FilterResultIterator(iterator, where);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 1b0af8c..fc5a04d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
@@ -103,7 +104,12 @@ public class CorrelatePlan extends DelegateQueryPlan {
     }
 
     @Override
-    public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) 
+                throws SQLException {
+        return iterator(scanGrouper, null);
+    }
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan)
             throws SQLException {
         return new ResultIterator() {
             private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 36b725e..5887ff3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -52,7 +52,7 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 5fdec46..d332f68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -140,6 +140,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        return iterator(scanGrouper, this.delegate.getContext().getScan());
+    }
+        
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         int count = subPlans.length;
         PhoenixConnection connection = getContext().getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
@@ -216,11 +221,10 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         if (joinInfo != null) {
-            Scan scan = delegate.getContext().getScan();
             HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
         }
         
-        ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper);
+        ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper, scan) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper, scan);
         if (statement.getInnerSelectStatement() != null && postFilter != null) {
             iterator = new FilterResultIterator(iterator, postFilter);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index fe767d9..db99964 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -71,7 +71,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper)
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan)
             throws SQLException {
         ResultIterator scanner = new ResultIterator() {
             private final Iterator<Tuple> tupleIterator = tuples.iterator();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 980db52..9fbdb3a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -200,9 +200,8 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be too late afterwards
-    	Scan scan = context.getScan();
         scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
         ResultIterator scanner;
         TableRef tableRef = this.getTableRef();
@@ -220,11 +219,11 @@ public class ScanPlan extends BaseQueryPlan {
         BaseResultIterators iterators;
         boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, offset, isSalted, table.getIndexType());
         if (isOffsetOnServer) {
-            iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper);
+            iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan);
         } else if (isSerial) {
-            iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper);
+            iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan);
         } else {
-            iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper);
+            iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan);
         }
         splits = iterators.getSplits();
         scans = iterators.getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index e181e80..8e0e6e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -131,7 +131,12 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
-    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {        
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        return iterator(scanGrouper, null);
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {        
         return type == JoinType.Semi || type == JoinType.Anti ? 
                 new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)) :
                 new BasicJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index e8d9af0..0ba0cc1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.expression.Expression;
@@ -60,7 +61,12 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
-        ResultIterator iterator = new DelegateResultIterator(delegate.iterator(scanGrouper)) {
+        return iterator(scanGrouper, delegate.getContext().getScan());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+        ResultIterator iterator = new DelegateResultIterator(delegate.iterator(scanGrouper, scan)) {
             
             @Override
             public Tuple next() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 808141e..cf95b5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -139,7 +139,11 @@ public class UnionPlan implements QueryPlan {
     public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
         return iterator(Collections.<SQLCloseable>emptyList());
     }
-    
+
+    @Override
+    public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList());
+    }
     @Override
     public final ResultIterator iterator() throws SQLException {
         return iterator(Collections.<SQLCloseable>emptyList());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 8905eef..94c59df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
@@ -53,7 +54,12 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
-        return new UnnestArrayResultIterator(delegate.iterator(scanGrouper));
+        return new UnnestArrayResultIterator(delegate.iterator(scanGrouper, delegate.getContext().getScan()));
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+        return new UnnestArrayResultIterator(delegate.iterator(scanGrouper, scan));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 043bd30..bee2201 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,8 +17,10 @@
  */
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
+
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
@@ -116,15 +118,16 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     private final List<KeyRange> splits;
     private final PTableStats tableStats;
     private final byte[] physicalTableName;
-    private final QueryPlan plan;
+    protected final QueryPlan plan;
     protected final String scanId;
     protected final MutationState mutationState;
-    private final ParallelScanGrouper scanGrouper;
+    protected final ParallelScanGrouper scanGrouper;
     // TODO: too much nesting here - breakup into new classes.
     private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
     private Long estimatedRows;
     private Long estimatedSize;
     private boolean hasGuidePosts;
+    private Scan scan;
     
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
         @Override
@@ -138,7 +141,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     }
     
     private boolean useStats() {
-        Scan scan = context.getScan();
         boolean isPointLookup = context.getScanRanges().isPointLookup();
         /*
          *  Don't use guide posts if:
@@ -153,11 +155,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return true;
     }
     
-    private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset) {
+    private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) {
         StatementContext context = plan.getContext();
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
-        Scan scan = context.getScan();
 
         Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
         // Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix their row keys
@@ -331,10 +332,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         }
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper) throws SQLException {
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(),
                 plan.getStatement().getHint(), plan.getLimit(), plan instanceof ScanPlan ? plan.getOffset() : null);
         this.plan = plan;
+        this.scan = scan;
         this.scanGrouper = scanGrouper;
         StatementContext context = plan.getContext();
         // Clone MutationState as the one on the connection will change if auto commit is on
@@ -347,7 +349,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         // Used to tie all the scans together during logging
         scanId = UUID.randomUUID().toString();
         
-        initializeScan(plan, perScanLimit, offset);
+        initializeScan(plan, perScanLimit, offset, scan);
         
         this.scans = getParallelScans();
         List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
@@ -471,10 +473,69 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     }
 
     private List<List<Scan>> getParallelScans() throws SQLException {
+        // If the scan boundaries are not matching with scan in context that means we need to get
+        // parallel scans for the chunk after split/merge.
+        if (!ScanUtil.isConextScan(scan, context)) {
+            return getParallelScans(scan);
+        }
         return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
     }
 
     /**
+     * Get parallel scans of the specified scan boundaries. This can be used for getting parallel
+     * scans when there is split/merges while scanning a chunk. In this case we need not go by all
+     * the regions or guideposts.
+     * @param scan
+     * @return
+     * @throws SQLException
+     */
+    private List<List<Scan>> getParallelScans(Scan scan) throws SQLException {
+        List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+                .getAllTableRegions(physicalTableName);
+        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+        int regionIndex = 0;
+        int stopIndex = regionBoundaries.size();
+        if (scan.getStartRow().length > 0) {
+            regionIndex = getIndexContainingInclusive(regionBoundaries, scan.getStartRow());
+        }
+        if (scan.getStopRow().length > 0) {
+            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), scan.getStopRow()));
+        }
+        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+        List<Scan> scans = Lists.newArrayListWithExpectedSize(2);
+        while (regionIndex <= stopIndex) {
+            HRegionLocation regionLocation = regionLocations.get(regionIndex);
+            HRegionInfo regionInfo = regionLocation.getRegionInfo();
+            Scan newScan = ScanUtil.newScan(scan);
+            byte[] endKey;
+            if (regionIndex == stopIndex) {
+                endKey = scan.getStopRow();
+            } else {
+                endKey = regionBoundaries.get(regionIndex);
+            }
+            if(ScanUtil.isLocalIndex(scan)) {
+                ScanUtil.setLocalIndexAttributes(newScan, 0, regionInfo.getStartKey(),
+                    regionInfo.getEndKey(), newScan.getAttribute(SCAN_START_ROW_SUFFIX),
+                    newScan.getAttribute(SCAN_STOP_ROW_SUFFIX));
+            } else {
+                if(Bytes.compareTo(scan.getStartRow(), regionInfo.getStartKey())<=0) {
+                    newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionInfo.getStartKey());
+                    newScan.setStartRow(regionInfo.getStartKey());
+                }
+                if(scan.getStopRow().length == 0 || (regionInfo.getEndKey().length != 0 && Bytes.compareTo(scan.getStopRow(), regionInfo.getEndKey())>0)) {
+                    newScan.setStopRow(regionInfo.getEndKey());
+                }
+            }
+            scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
+            regionIndex++;
+        }
+        if (!scans.isEmpty()) { // Add any remaining scans
+            parallelScans.add(scans);
+        }
+        return parallelScans;
+    }
+
+    /**
      * Compute the list of parallel scans to run for a given query. The inner scans
      * may be concatenated together directly, while the other ones may need to be
      * merge sorted, depending on the query.
@@ -482,7 +543,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
      * @throws SQLException
      */
     private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
-        Scan scan = context.getScan();
         List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
                 .getAllTableRegions(physicalTableName);
         List<byte[]> regionBoundaries = toBoundaries(regionLocations);
@@ -555,6 +615,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     
             // Merge bisect with guideposts for all but the last region
             while (regionIndex <= stopIndex) {
+                HRegionLocation regionLocation = regionLocations.get(regionIndex);
+                HRegionInfo regionInfo = regionLocation.getRegionInfo();
                 byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
                 byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
                 if (regionIndex == stopIndex) {
@@ -562,9 +624,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 } else {
                     endKey = regionBoundaries.get(regionIndex);
                 }
-                HRegionLocation regionLocation = regionLocations.get(regionIndex);
                 if (isLocalIndex) {
-                    HRegionInfo regionInfo = regionLocation.getRegionInfo();
                     endRegionKey = regionInfo.getEndKey();
                     keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
                 }
@@ -572,6 +632,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
                         Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
                                 false);
+                        if(newScan != null) {
+                            ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
+                                regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
+                        }
                         if (newScan != null) {
                             estimatedRows += gps.getRowCounts().get(guideIndex);
                             estimatedSize += gps.getByteCounts().get(guideIndex);
@@ -584,12 +648,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     }
                 } catch (EOFException e) {}
                 Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true);
-                if (isLocalIndex) {
-                    if (newScan != null) {
-                        newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
-                    } else if (!scans.isEmpty()) {
-                        scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
-                    }
+                if(newScan != null) {
+                    ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
+                        regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
                 }
                 scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
                 currentKeyBytes = endKey;
@@ -628,7 +689,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
      */
     @Override
     public List<PeekingResultIterator> getIterators() throws SQLException {
-        Scan scan = context.getScan();
         if (logger.isDebugEnabled()) {
             logger.debug(LogUtil.addCustomAnnotations("Getting iterators for " + this,
                     ScanUtil.getCustomAnnotations(scan)));
@@ -676,7 +736,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         SQLException toThrow = null;
         int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
         try {
-            submitWork(scan, futures, allIterators, splitSize, isReverse);
+            submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper);
             boolean clearedCache = false;
             for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
                 List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
@@ -687,15 +747,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                             throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException(); 
                         }
                         if (isLocalIndex && previousScan != null && previousScan.getScan() != null
-                                && ((!isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
+                                && (((!isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
                                         previousScan.getScan().getStopRow()) < 0)
                                 || (isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
                                         previousScan.getScan().getStopRow()) > 0)
-                                || (scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY) != null
-                                        && previousScan.getScan().getAttribute(EXPECTED_UPPER_REGION_KEY) != null
-                                        && Bytes.compareTo(scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY),
-                                                previousScan.getScan()
-                                                        .getAttribute(EXPECTED_UPPER_REGION_KEY)) == 0))) {
+                                || (Bytes.compareTo(scanPair.getFirst().getStopRow(), previousScan.getScan().getStopRow()) == 0)) 
+                                    && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_START_ROW_SUFFIX), previousScan.getScan().getAttribute(SCAN_START_ROW_SUFFIX))==0)) {
                             continue;
                         }
                         PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
@@ -714,9 +771,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                             Scan oldScan = scanPair.getFirst();
                             byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
                             byte[] endKey = oldScan.getStopRow();
-                            if (isLocalIndex) {
-                                endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
-                            }
                             
                             List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
                             // Add any concatIterators that were successful so far
@@ -868,7 +922,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse) throws SQLException;
+            Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException;
     
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 7a830de..a12d40c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -18,7 +18,7 @@
 
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 
 import java.sql.SQLException;
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.query.QueryServices;
@@ -62,6 +63,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
     private final MutationState mutationState;
     private Scan scan;
     private PeekingResultIterator resultIterator;
+    private QueryPlan plan;
     
     /**
      * Chunking is deprecated and shouldn't be used while implementing new features. As of HBase 0.98.17, 
@@ -84,30 +86,31 @@ public class ChunkedResultIterator implements PeekingResultIterator {
         }
 
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException {
             if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
             return new ChunkedResultIterator(delegateFactory, mutationState, context, tableRef, scan, 
                     mutationState.getConnection().getQueryServices().getProps().getLong(
                                 QueryServices.SCAN_RESULT_CHUNK_SIZE,
-                                QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner);
+                                QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner, plan);
         }
     }
 
     private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, MutationState mutationState,
-    		StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner) throws SQLException {
+    		StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner, QueryPlan plan) throws SQLException {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;
         this.tableRef = tableRef;
         this.scan = scan;
         this.chunkSize = chunkSize;
         this.mutationState = mutationState;
+        this.plan = plan;
         // Instantiate single chunk iterator and the delegate iterator in constructor
         // to get parallel scans kicked off in separate threads. If we delay this,
         // we'll get serialized behavior (see PHOENIX-
         if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
         ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(scanner, chunkSize);
         String tableName = tableRef.getTable().getPhysicalName().getString();
-        resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
+        resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
     }
 
     @Override
@@ -134,13 +137,20 @@ public class ChunkedResultIterator implements PeekingResultIterator {
         if (resultIterator.peek() == null && lastKey != null) {
             resultIterator.close();
             scan = ScanUtil.newScan(scan);
-            scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
+            if(ScanUtil.isLocalIndex(scan)) {
+                scan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.copyKeyBytesIfNecessary(lastKey));
+            } else {
+                scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
+            }
             if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
             String tableName = tableRef.getTable().getPhysicalName().getString();
             long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
-            ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
-                    new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold), chunkSize);
-            resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
+            ResultIterator singleChunkResultIterator =
+                    new SingleChunkResultIterator(new TableResultIterator(mutationState, scan,
+                            context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName),
+                            renewLeaseThreshold, plan, DefaultParallelScanGrouper.getInstance()),
+                            chunkSize);
+            resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
         }
         return resultIterator;
     }
@@ -174,9 +184,6 @@ public class ChunkedResultIterator implements PeekingResultIterator {
                 // be able to start the next chunk on the next row key
                 if (rowCount == chunkSize) {
                     next.getKey(lastKey);
-                    if (scan.getAttribute(STARTKEY_OFFSET) != null) {
-                        addRegionStartKeyToLaskKey();
-                    }
                 } else if (rowCount > chunkSize && rowKeyChanged(next)) {
                     chunkComplete = true;
                     return null;
@@ -203,29 +210,10 @@ public class ChunkedResultIterator implements PeekingResultIterator {
             int offset = lastKey.getOffset();
             int length = lastKey.getLength();
             newTuple.getKey(lastKey);
-            if (scan.getAttribute(STARTKEY_OFFSET) != null) {
-                addRegionStartKeyToLaskKey();
-            }
 
             return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0;
         }
 
-        /**
-         * Prefix region start key to last key to form actual row key in case of local index scan.
-         */
-        private void addRegionStartKeyToLaskKey() {
-            byte[] offsetBytes = scan.getAttribute(STARTKEY_OFFSET);
-            if (offsetBytes != null) {
-                int startKeyOffset = Bytes.toInt(offsetBytes);
-                byte[] actualLastkey =
-                        new byte[startKeyOffset + lastKey.getLength() - lastKey.getOffset()];
-                System.arraycopy(scan.getStartRow(), 0, actualLastkey, 0, startKeyOffset);
-                System.arraycopy(lastKey.get(), lastKey.getOffset(), actualLastkey,
-                    startKeyOffset, lastKey.getLength());
-                lastKey.set(actualLastkey);
-            }
-        }
-
 		@Override
 		public String toString() {
 			return "SingleChunkResultIterator [rowCount=" + rowCount

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
index 2258caf..b720b56 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.TableRef;
@@ -28,8 +29,8 @@ public class DefaultTableResultIteratorFactory implements TableResultIteratorFac
 
     @Override
     public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan,
-            CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException {
-        return new TableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+            CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+        return new TableResultIterator(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
index f25e373..dbe9910 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -20,15 +20,16 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 
 public interface ParallelIteratorFactory {
     public static ParallelIteratorFactory NOOP_FACTORY = new ParallelIteratorFactory() {
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName)
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan)
                 throws SQLException {
             return LookAheadResultIterator.wrap(scanner);
         }
     };
-    PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException;
+    PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan) throws SQLException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index a5664c7..d038f77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -54,20 +54,20 @@ public class ParallelIterators extends BaseResultIterators {
 	private static final String NAME = "PARALLEL";
     private final ParallelIteratorFactory iteratorFactory;
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan)
             throws SQLException {
-        super(plan, perScanLimit, null, scanGrouper);
+        super(plan, perScanLimit, null, scanGrouper, scan);
         this.iteratorFactory = iteratorFactory;
     }   
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan)
             throws SQLException {
-        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance());
+        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan);
     }  
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse) throws SQLException {
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -98,7 +98,7 @@ public class ParallelIterators extends BaseResultIterators {
             final Scan scan = scanLocation.getScan();
             final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
             final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
-            final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+            final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
             context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 
@@ -109,7 +109,7 @@ public class ParallelIterators extends BaseResultIterators {
                     if (logger.isDebugEnabled()) {
                         logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
                     }
-                    PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName);
+                    PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName, ParallelIterators.this.plan);
                     // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed.
                     iterator.peek();
                     allIterators.add(iterator);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index d2c89b9..0d0c5d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -59,9 +59,9 @@ public class SerialIterators extends BaseResultIterators {
     private final Integer offset;
     
     public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset,
-            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
+            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan)
             throws SQLException {
-        super(plan, perScanLimit, offset, scanGrouper);
+        super(plan, perScanLimit, offset, scanGrouper, scan);
         this.offset = offset;
         // must be a offset or a limit specified or a SERIAL hint
         Preconditions.checkArgument(
@@ -71,7 +71,7 @@ public class SerialIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse) {
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper) {
         ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
         final String tableName = tableRef.getTable().getPhysicalName().getString();
         final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
@@ -165,8 +165,8 @@ public class SerialIterators extends BaseResultIterators {
                 if (remainingOffset != null) {
                     currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, PInteger.INSTANCE.toBytes(remainingOffset));
                 }
-                TableResultIterator itr = new TableResultIterator(mutationState, tableRef, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold);
-                PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName);
+                TableResultIterator itr = new TableResultIterator(mutationState, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, plan, scanGrouper);
+                PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan);
                 Tuple tuple;
                 if ((tuple = peekingItr.peek()) == null) {
                     peekingItr.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 4f85a5f..3136ca8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -36,6 +36,7 @@ import org.apache.commons.io.output.DeferredFileOutputStream;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
@@ -84,7 +85,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan) throws SQLException {
             ReadMetricQueue readRequestMetric = context.getReadMetricsQueue();
             SpoolingMetricsHolder spoolMetrics = new SpoolingMetricsHolder(readRequestMetric, physicalTableName);
             MemoryMetricsHolder memoryMetrics = new MemoryMetricsHolder(readRequestMetric, physicalTableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index a7e3068..3fe6098 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
@@ -32,12 +34,17 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -57,6 +64,10 @@ public class TableResultIterator implements ResultIterator {
     private final CombinableMetric scanMetrics;
     private static final ResultIterator UNINITIALIZED_SCANNER = ResultIterator.EMPTY_ITERATOR;
     private final long renewLeaseThreshold;
+    private final QueryPlan plan;
+    private final ParallelScanGrouper scanGrouper;
+    private Tuple lastTuple = null;
+    private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 
     @GuardedBy("this")
     private ResultIterator scanIterator;
@@ -73,20 +84,24 @@ public class TableResultIterator implements ResultIterator {
         this.renewLeaseThreshold = 0;
         this.htable = null;
         this.scan = null;
+        this.plan = null;
+        this.scanGrouper = null;
     }
 
     public static enum RenewLeaseStatus {
         RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED
     };
 
-    public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics,
-            long renewLeaseThreshold) throws SQLException {
+    public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics,
+            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
         this.scan = scan;
         this.scanMetrics = scanMetrics;
-        PTable table = tableRef.getTable();
+        this.plan = plan;
+        PTable table = plan.getTableRef().getTable();
         htable = mutationState.getHTable(table);
         this.scanIterator = UNINITIALIZED_SCANNER;
         this.renewLeaseThreshold = renewLeaseThreshold;
+        this.scanGrouper = scanGrouper;
     }
 
     @Override
@@ -107,8 +122,42 @@ public class TableResultIterator implements ResultIterator {
     @Override
     public synchronized Tuple next() throws SQLException {
         initScanner();
-        Tuple t = scanIterator.next();
-        return t;
+        try {
+            lastTuple = scanIterator.next();
+            if (lastTuple != null) {
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                lastTuple.getKey(ptr);
+            }
+        } catch (SQLException e) {
+            try {
+                throw ServerUtil.parseServerException(e);
+            } catch(StaleRegionBoundaryCacheException e1) {
+                if(ScanUtil.isNonAggregateScan(scan)) {
+                    // For non aggregate queries if we get stale region boundary exception we can
+                    // continue scanning from the next value of lasted fetched result.
+                    Scan newScan = ScanUtil.newScan(scan);
+                    newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW));
+                    if(lastTuple != null) {
+                        lastTuple.getKey(ptr);
+                        byte[] startRowSuffix = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                        if(ScanUtil.isLocalIndex(newScan)) {
+                            // If we just set scan start row suffix then server side we prepare
+                            // actual scan boundaries by prefixing the region start key.
+                            newScan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.nextKey(startRowSuffix));
+                        } else {
+                            newScan.setStartRow(ByteUtil.nextKey(startRowSuffix));
+                        }
+                    }
+                    plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
+                    this.scanIterator =
+                            plan.iterator(scanGrouper, newScan);
+                    lastTuple = scanIterator.next();
+                } else {
+                    throw e;
+                }
+            }
+        }
+        return lastTuple;
     }
 
     public synchronized void initScanner() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
index 0f5ac9b..8d7b54d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -20,10 +20,11 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.TableRef;
 
 public interface TableResultIteratorFactory {
-    public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException;        
+    public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException;        
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 5b799a0..0c154e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -491,6 +491,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 }
 
                 @Override
+                public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+                    return iterator;
+                }
+
+                @Override
                 public long getEstimatedSize() {
                     return 0;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 51ac795..17d9b6a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -39,6 +39,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -50,6 +51,7 @@ import org.apache.phoenix.monitoring.ReadMetricQueue;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+
 import org.apache.phoenix.query.ConnectionQueryServices;
 
 /**
@@ -120,8 +122,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
             for (Scan scan : scans) {
                 // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
-                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(),
-                        queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold);
+                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance());
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }


[3/3] phoenix git commit: PHOENIX-2628 Ensure split when iterating through results handled correctly(Rajeshbabu)

Posted by ra...@apache.org.
PHOENIX-2628 Ensure split when iterating through results handled correctly(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d700c1f0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d700c1f0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d700c1f0

Branch: refs/heads/master
Commit: d700c1f032a0f5d119c669100648caf040233ebe
Parents: 99713a6
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Wed May 4 18:55:49 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Wed May 4 18:55:49 2016 +0530

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/BaseViewIT.java  |   5 +
 .../phoenix/end2end/index/LocalIndexIT.java     |  51 +--
 .../phoenix/end2end/index/MutableIndexIT.java   | 239 ++++++++++-
 .../DelayedTableResultIteratorFactory.java      |   9 +-
 .../iterate/MockParallelIteratorFactory.java    |   3 +-
 .../regionserver/IndexHalfStoreFileReader.java  | 412 +------------------
 .../IndexHalfStoreFileReaderGenerator.java      | 123 +++---
 .../hbase/regionserver/LocalIndexSplitter.java  |  37 --
 .../LocalIndexStoreFileScanner.java             | 254 ++++++++++++
 .../phoenix/compile/ListJarsQueryPlan.java      |   4 +
 .../MutatingParallelIteratorFactory.java        |   2 +-
 .../org/apache/phoenix/compile/QueryPlan.java   |   2 +
 .../org/apache/phoenix/compile/ScanRanges.java  |  11 +-
 .../apache/phoenix/compile/TraceQueryPlan.java  |   5 +
 .../coprocessor/BaseScannerRegionObserver.java  |  94 ++++-
 .../GroupedAggregateRegionObserver.java         |  16 +-
 .../UngroupedAggregateRegionObserver.java       |   8 +-
 .../apache/phoenix/execute/AggregatePlan.java   |  21 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |  20 +-
 .../phoenix/execute/ClientAggregatePlan.java    |   6 +
 .../apache/phoenix/execute/ClientScanPlan.java  |   7 +-
 .../apache/phoenix/execute/CorrelatePlan.java   |   8 +-
 .../phoenix/execute/DegenerateQueryPlan.java    |   2 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |   8 +-
 .../execute/LiteralResultIterationPlan.java     |   2 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |   9 +-
 .../phoenix/execute/SortMergeJoinPlan.java      |   7 +-
 .../phoenix/execute/TupleProjectionPlan.java    |   8 +-
 .../org/apache/phoenix/execute/UnionPlan.java   |   6 +-
 .../apache/phoenix/execute/UnnestArrayPlan.java |   8 +-
 .../phoenix/iterate/BaseResultIterators.java    | 112 +++--
 .../phoenix/iterate/ChunkedResultIterator.java  |  50 +--
 .../DefaultTableResultIteratorFactory.java      |   5 +-
 .../iterate/ParallelIteratorFactory.java        |   5 +-
 .../phoenix/iterate/ParallelIterators.java      |  14 +-
 .../apache/phoenix/iterate/SerialIterators.java |  10 +-
 .../phoenix/iterate/SpoolingResultIterator.java |   3 +-
 .../phoenix/iterate/TableResultIterator.java    |  59 ++-
 .../iterate/TableResultIteratorFactory.java     |   3 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   5 +
 .../phoenix/mapreduce/PhoenixRecordReader.java  |   5 +-
 .../java/org/apache/phoenix/util/ScanUtil.java  |  64 +++
 .../java/org/apache/phoenix/query/BaseTest.java |   2 +-
 .../query/ParallelIteratorsSplitTest.java       |   9 +-
 .../hive/mapreduce/PhoenixRecordReader.java     |   5 +-
 45 files changed, 1078 insertions(+), 660 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
index 7e7175f..65f1f93 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
@@ -33,6 +33,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -45,6 +46,7 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
@@ -98,6 +100,9 @@ public abstract class BaseViewIT extends BaseOwnClusterHBaseManagedTimeIT {
         if (saltBuckets == null) {
             try (Connection conn = DriverManager.getConnection(getUrl())) {
                 HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName));
+                if(ScanUtil.isLocalIndex(scan)) {
+                    ScanUtil.setLocalIndexAttributes(scan, 0, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, scan.getStartRow(), scan.getStopRow());
+                }
                 ResultScanner scanner = htable.getScanner(scan);
                 Result result = scanner.next();
                 // Confirm index has rows

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 2d79f36..f7edea7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -790,15 +790,27 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
                             admin.getConnection(), indexTable, false);
                 }
                 assertEquals(4 + i, regionsOfIndexTable.size());
+                String[] tIdColumnValues = new String[26]; 
+                String[] v1ColumnValues = new String[26];
+                int[] k1ColumnValue = new int[26];
                 String query = "SELECT t_id,k1,v1 FROM " + tableName;
                 rs = conn1.createStatement().executeQuery(query);
                 Thread.sleep(1000);
                 for (int j = 0; j < 26; j++) {
                     assertTrue(rs.next());
-                    assertEquals(strings[25-j], rs.getString("t_id"));
-                    assertEquals(25-j, rs.getInt("k1"));
-                    assertEquals(strings[j], rs.getString("V1"));
+                    tIdColumnValues[j] = rs.getString("t_id");
+                    k1ColumnValue[j] = rs.getInt("k1");
+                    v1ColumnValues[j] = rs.getString("V1");
                 }
+                Arrays.sort(tIdColumnValues);
+                Arrays.sort(v1ColumnValues);
+                Arrays.sort(k1ColumnValue);
+                assertTrue(Arrays.equals(strings, tIdColumnValues));
+                assertTrue(Arrays.equals(strings, v1ColumnValues));
+                for(int m=0;m<26;m++) {
+                    assertEquals(m, k1ColumnValue[m]);
+                }
+
                 rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
                 assertEquals(
                         "CLIENT PARALLEL " + (4 + i) + "-WAY RANGE SCAN OVER "
@@ -817,11 +829,20 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
                             + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
                 rs = conn1.createStatement().executeQuery(query);
                 Thread.sleep(1000);
+                int[] k3ColumnValue = new int[26];
                 for (int j = 0; j < 26; j++) {
                     assertTrue(rs.next());
-                    assertEquals(strings[j], rs.getString("t_id"));
-                    assertEquals(j, rs.getInt("k1"));
-                    assertEquals(j+2, rs.getInt("k3"));
+                    tIdColumnValues[j] = rs.getString("t_id");
+                    k1ColumnValue[j] = rs.getInt("k1");
+                    k3ColumnValue[j] = rs.getInt("k3");
+                }
+                Arrays.sort(tIdColumnValues);
+                Arrays.sort(k1ColumnValue);
+                Arrays.sort(k3ColumnValue);
+                assertTrue(Arrays.equals(strings, tIdColumnValues));
+                for(int m=0;m<26;m++) {
+                    assertEquals(m, k1ColumnValue[m]);
+                    assertEquals(m+2, k3ColumnValue[m]);
                 }
             }
        } finally {
@@ -1013,24 +1034,6 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
             assertEquals(5, regionsOfIndexTable.size());
             boolean success = latch1.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS);
             assertTrue("Timed out waiting for MockedLocalIndexSplitter.preSplitAfterPONR to complete", success);
-            // Verify the metadata for index is correct.
-            rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indexName,
-                    new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(indexName, rs.getString(3));
-            assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-            rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indexName+"_2",
-                new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(indexName+"_2", rs.getString(3));
-            assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-
-            String query = "SELECT t_id,k1,v1 FROM " + tableName+"2";
-            rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
-            assertEquals("CLIENT PARALLEL " + 1 + "-WAY FULL SCAN OVER " + tableName+"2",
-                QueryUtil.getExplainPlan(rs));
             latch2.countDown();
        } finally {
             conn1.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index add282e..80f1250 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -31,14 +32,25 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import jline.internal.Log;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -86,8 +98,8 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
 	
 	@Parameters(name="localIndex = {0} , transactional = {1}")
     public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {     
-                 { false, false }, { false, true }, { true, false }, { true, true }
+        return Arrays.asList(new Boolean[][] {
+                { false, false }, { false, true }, { true, false }, { true, true }
            });
     }
     
@@ -594,4 +606,227 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
         }
     }
 
+    @Test
+    public void testSplitDuringIndexScan() throws Exception {
+        testSplitDuringIndexScan(false);
+    }
+    
+    @Test
+    public void testSplitDuringIndexReverseScan() throws Exception {
+        testSplitDuringIndexScan(true);
+    }
+
+    private void testSplitDuringIndexScan(boolean isReverse) throws Exception {
+        Properties props = new Properties();
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(2));
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+        try(Connection conn1 = DriverManager.getConnection(getUrl(), props)){
+            String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
+            HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+            dropTable(admin, conn1);
+            createTableAndLoadData(conn1, strings, isReverse);
+
+            ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + tableName);
+            assertTrue(rs.next());
+            splitDuringScan(conn1, strings, admin, isReverse);
+            dropTable(admin, conn1);
+       } 
+    }
+
+    private void dropTable(HBaseAdmin admin, Connection conn) throws SQLException, IOException {
+        conn.createStatement().execute("DROP TABLE IF EXISTS "+ tableName);
+        if(admin.tableExists(tableName)) {
+            admin.disableTable(TableName.valueOf(tableName));
+            admin.deleteTable(TableName.valueOf(tableName));
+        } 
+        if(admin.tableExists(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName)) {
+            admin.disableTable(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName);
+            admin.deleteTable(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName);
+        }
+    }
+
+    private void createTableAndLoadData(Connection conn1, String[] strings, boolean isReverse) throws SQLException {
+        createBaseTable(conn1, tableName, null);
+        for (int i = 0; i < 26; i++) {
+            conn1.createStatement().execute(
+                "UPSERT INTO " + tableName + " values('"+strings[i]+"'," + i + ","
+                        + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+        }
+        conn1.commit();
+        conn1.createStatement().execute(
+            "CREATE " + (localIndex ? "LOCAL" : "")+" INDEX " + indexName + " ON " + tableName + "(v1"+(isReverse?" DESC":"")+") include (k3)");
+    }
+
+    @Test
+    public void testIndexHalfStoreFileReader() throws Exception {
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+        try {
+            dropTable(admin, conn1);
+            createBaseTable(conn1, tableName, "('e')");
+            conn1.createStatement().execute("CREATE "+(localIndex?"LOCAL":"")+" INDEX " + indexName + " ON " + tableName + "(v1)" + (localIndex?"":" SPLIT ON ('e')"));
+            conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')");
+            conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')");
+            conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')");
+            conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')");
+            conn1.commit();
+
+            String query = "SELECT count(*) FROM " + tableName +" where v1<='z'";
+            ResultSet rs = conn1.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(4, rs.getInt(1));
+
+            TableName table = TableName.valueOf(localIndex?tableName: indexName);
+            TableName indexTable = TableName.valueOf(localIndex?MetaDataUtil.getLocalIndexTableName(tableName): indexName);
+            admin.flush(indexTable);
+            boolean merged = false;
+            // merge regions until 1 left
+            end: while (true) {
+              long numRegions = 0;
+              while (true) {
+                rs = conn1.createStatement().executeQuery(query);
+                assertTrue(rs.next());
+                System.out.println("Number of rows returned:" + rs.getInt(1));
+                assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results?
+                try {
+                  List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable);
+                  numRegions = indexRegions.size();
+                  if (numRegions==1) {
+                    break end;
+                  }
+                  if(!merged) {
+                            List<HRegionInfo> regions =
+                                    admin.getTableRegions(localIndex ? table : indexTable);
+                      System.out.println("Merging: " + regions.size());
+                      admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
+                          regions.get(1).getEncodedNameAsBytes(), false);
+                      merged = true;
+                      Threads.sleep(10000);
+                  }
+                  break;
+                } catch (Exception ex) {
+                  Log.info(ex);
+                }
+
+                long waitStartTime = System.currentTimeMillis();
+                // wait until merge happened
+                while (System.currentTimeMillis() - waitStartTime < 10000) {
+                  List<HRegionInfo> regions = admin.getTableRegions(indexTable);
+                  System.out.println("Waiting:" + regions.size());
+                  if (regions.size() < numRegions) {
+                    break;
+                  }
+                  Threads.sleep(1000);
+                }
+              }
+            }
+        } finally {
+            dropTable(admin, conn1);
+        }
+    }
+
+    private List<HRegionInfo> mergeRegions(HBaseAdmin admin, List<HRegionInfo> regionsOfUserTable)
+            throws IOException, InterruptedException {
+        for (int i = 2; i > 0; i--) {
+            Threads.sleep(10000);
+            admin.mergeRegions(regionsOfUserTable.get(0).getEncodedNameAsBytes(),
+                regionsOfUserTable.get(1).getEncodedNameAsBytes(), false);
+            regionsOfUserTable =
+                    MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), admin.getConnection(),
+                        TableName.valueOf(localIndex? tableName:indexName), false);
+
+            while (regionsOfUserTable.size() != i) {
+                Thread.sleep(100);
+                regionsOfUserTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+                    admin.getConnection(), TableName.valueOf(localIndex? tableName:indexName), false);
+            }
+            assertEquals(i, regionsOfUserTable.size());
+            if(localIndex) {
+                List<HRegionInfo> regionsOfIndexTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+                    admin.getConnection(), TableName.valueOf(MetaDataUtil.getLocalIndexTableName(tableName)), false);
+               while (regionsOfIndexTable.size() != i) {
+                   Thread.sleep(100);
+                   regionsOfIndexTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+                       admin.getConnection(), TableName.valueOf(MetaDataUtil.getLocalIndexTableName(tableName)), false);
+               }
+               assertEquals(i, regionsOfIndexTable.size());
+            }
+        }
+        return regionsOfUserTable;
+    }
+
+    private List<HRegionInfo> splitDuringScan(Connection conn1, String[] strings, HBaseAdmin admin, boolean isReverse)
+            throws SQLException, IOException, InterruptedException {
+        ResultSet rs;
+        String query = "SELECT t_id,k1,v1 FROM " + tableName;
+        rs = conn1.createStatement().executeQuery(query);
+        String[] tIdColumnValues = new String[26]; 
+        String[] v1ColumnValues = new String[26];
+        int[] k1ColumnValue = new int[26];
+        for (int j = 0; j < 5; j++) {
+            assertTrue(rs.next());
+            tIdColumnValues[j] = rs.getString("t_id");
+            k1ColumnValue[j] = rs.getInt("k1");
+            v1ColumnValues[j] = rs.getString("V1");
+        }
+
+        String[] splitKeys = new String[2];
+        splitKeys[0] = strings[4];
+        splitKeys[1] = strings[12];
+
+        int[] splitInts = new int[2];
+        splitInts[0] = 22;
+        splitInts[1] = 4;
+        List<HRegionInfo> regionsOfUserTable = null;
+        for(int i = 0; i <=1; i++) {
+            Threads.sleep(10000);
+            if(localIndex) {
+                admin.split(Bytes.toBytes(tableName),
+                    ByteUtil.concat(Bytes.toBytes(splitKeys[i])));
+            } else {
+                admin.split(Bytes.toBytes(indexName), ByteUtil.concat(Bytes.toBytes(splitInts[i])));
+            }
+            Thread.sleep(100);
+            regionsOfUserTable =
+                    MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+                        admin.getConnection(), TableName.valueOf(localIndex?tableName:indexName),
+                        false);
+
+            while (regionsOfUserTable.size() != (i+2)) {
+                Thread.sleep(100);
+                regionsOfUserTable =
+                        MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+                            admin.getConnection(),
+                            TableName.valueOf(localIndex?tableName:indexName), false);
+            }
+            assertEquals(i+2, regionsOfUserTable.size());
+        }
+        for (int j = 5; j < 26; j++) {
+            assertTrue(rs.next());
+            tIdColumnValues[j] = rs.getString("t_id");
+            k1ColumnValue[j] = rs.getInt("k1");
+            v1ColumnValues[j] = rs.getString("V1");
+        }
+        Arrays.sort(tIdColumnValues);
+        Arrays.sort(v1ColumnValues);
+        Arrays.sort(k1ColumnValue);
+        assertTrue(Arrays.equals(strings, tIdColumnValues));
+        assertTrue(Arrays.equals(strings, v1ColumnValues));
+        for(int i=0;i<26;i++) {
+            assertEquals(i, k1ColumnValue[i]);
+        }
+        assertFalse(rs.next());
+        return regionsOfUserTable;
+    }
+
+    private void createBaseTable(Connection conn, String tableName, String splits) throws SQLException {
+        String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" +
+                "k1 INTEGER NOT NULL,\n" +
+                "k2 INTEGER NOT NULL,\n" +
+                "k3 INTEGER,\n" +
+                "v1 VARCHAR,\n" +
+                "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
+                        + (tableDDLOptions!=null?tableDDLOptions:"") + (splits != null ? (" split on " + splits) : "");
+        conn.createStatement().execute(ddl);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
index 74deb71..55bed91 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
@@ -39,13 +40,13 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac
     
     @Override
     public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan,
-            CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException {
-        return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+            CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+        return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
     }
     
     private class DelayedTableResultIterator extends TableResultIterator {
-        public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException {
-            super(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+        public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+            super(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
         }
         
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
index d8a08e6..b5c5f0f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.schema.PTable;
 
@@ -33,7 +34,7 @@ public class MockParallelIteratorFactory implements ParallelIteratorFactory {
     
     @Override
     public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan,
-            String physicalTableName) throws SQLException {
+            String physicalTableName, QueryPlan plan) throws SQLException {
         return new MockResultIterator(String.valueOf(counter.incrementAndGet()), table);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
index cbc4ed6..d1d12fb 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
@@ -18,27 +18,18 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.index.IndexMaintainer;
 
 /**
@@ -57,7 +48,6 @@ import org.apache.phoenix.index.IndexMaintainer;
  */
 
 public class IndexHalfStoreFileReader extends StoreFile.Reader {
-    private static final int ROW_KEY_LENGTH = 2;
     private final boolean top;
     // This is the key we split around. Its the first possible entry on a row:
     // i.e. empty column and a timestamp of LATEST_TIMESTAMP.
@@ -73,36 +63,6 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
      * @param fs
      * @param p
      * @param cacheConf
-     * @param r
-     * @param conf
-     * @param indexMaintainers
-     * @param viewConstants
-     * @param regionInfo
-     * @param regionStartKeyInHFile
-     * @param splitKey
-     * @throws IOException
-     */
-    public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
-            final Reference r, final Configuration conf,
-            final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
-            final byte[][] viewConstants, final HRegionInfo regionInfo,
-            final byte[] regionStartKeyInHFile, byte[] splitKey) throws IOException {
-        super(fs, p, cacheConf, conf);
-        this.splitkey = splitKey == null ? r.getSplitKey() : splitKey;
-        // Is it top or bottom half?
-        this.top = Reference.isTopFileRegion(r.getFileRegion());
-        this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
-        this.indexMaintainers = indexMaintainers;
-        this.viewConstants = viewConstants;
-        this.regionInfo = regionInfo;
-        this.regionStartKeyInHFile = regionStartKeyInHFile;
-        this.offset = regionStartKeyInHFile.length;
-    }
-
-    /**
-     * @param fs
-     * @param p
-     * @param cacheConf
      * @param in
      * @param size
      * @param r
@@ -132,371 +92,35 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
         this.offset = regionStartKeyInHFile.length;
     }
 
-    protected boolean isTop() {
-        return this.top;
+    public int getOffset() {
+        return offset;
     }
 
-    @Override
-    public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
-            final boolean isCompaction) {
-        final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction);
-        return new HFileScanner() {
-            final HFileScanner delegate = s;
-            public boolean atEnd = false;
-
-            @Override
-            public ByteBuffer getKey() {
-                if (atEnd) {
-                    return null;
-                }
-                boolean changeBottomKeys =
-                        regionInfo.getStartKey().length == 0 && splitRow.length != offset;
-                if (!top) {
-                    // For first region we are prepending empty byte array of length region end key.
-                    // So if split row length is not equal to region end key length then we need to
-                    // replace empty bytes of split row length. Because after split end key is the split
-                    // row.
-                    if(!changeBottomKeys) return delegate.getKey();
-                }
-                // If it is top store file replace the StartKey of the Key with SplitKey
-                return getChangedKey(delegate.getKeyValue(), changeBottomKeys);
-            }
-
-            private ByteBuffer getChangedKey(Cell kv, boolean changeBottomKeys) {
-                // new KeyValue(row, family, qualifier, timestamp, type, value)
-                byte[] newRowkey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
-                KeyValue newKv =
-                        new KeyValue(newRowkey, 0, newRowkey.length, kv.getFamilyArray(),
-                                kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
-                                kv.getQualifierOffset(), kv.getQualifierLength(),
-                                kv.getTimestamp(), Type.codeToType(kv.getTypeByte()), null, 0, 0);
-                ByteBuffer keyBuffer = ByteBuffer.wrap(newKv.getKey());
-                return keyBuffer;
-            }
-
-            private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(Cell kv, boolean changeBottomKeys) {
-                int lenOfRemainingKey = kv.getRowLength() - offset;
-                byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + splitRow.length];
-                System.arraycopy(changeBottomKeys ? new byte[splitRow.length] : splitRow, 0,
-                    keyReplacedStartKey, 0, splitRow.length);
-                System.arraycopy(kv.getRowArray(), kv.getRowOffset() + offset, keyReplacedStartKey,
-                    splitRow.length, lenOfRemainingKey);
-                return keyReplacedStartKey;
-            }
-
-            @Override
-            public String getKeyString() {
-                if (atEnd) {
-                    return null;
-                }
-                return Bytes.toStringBinary(getKey());
-            }
-
-            @Override
-            public ByteBuffer getValue() {
-                if (atEnd) {
-                    return null;
-                }
-                return delegate.getValue();
-            }
-
-            @Override
-            public String getValueString() {
-                if (atEnd) {
-                    return null;
-                }
-                return Bytes.toStringBinary(getValue());
-            }
-
-            @Override
-            public Cell getKeyValue() {
-                if (atEnd) {
-                    return null;
-                }
-                Cell kv = delegate.getKeyValue();
-                boolean changeBottomKeys =
-                        regionInfo.getStartKey().length == 0 && splitRow.length != offset;
-                if (!top) {
-                    if(!changeBottomKeys) return kv;
-                }
-                // If it is a top store file change the StartKey with SplitKey in Key
-                // and produce the new value corresponding to the change in key
-                byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
-                KeyValue changedKv =
-                        new KeyValue(changedKey, 0, changedKey.length, kv.getFamilyArray(),
-                                kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
-                                kv.getQualifierOffset(), kv.getQualifierLength(),
-                                kv.getTimestamp(), Type.codeToType(kv.getTypeByte()),
-                                kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
-                                kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
-                return changedKv;
-            }
-
-            @Override
-            public boolean next() throws IOException {
-                if (atEnd) {
-                    return false;
-                }
-                while (true) {
-                    boolean b = delegate.next();
-                    if (!b) {
-                        atEnd = true;
-                        return b;
-                    }
-                    // We need to check whether the current KV pointed by this reader is
-                    // corresponding to
-                    // this split or not.
-                    // In case of top store file if the ActualRowKey >= SplitKey
-                    // In case of bottom store file if the ActualRowKey < Splitkey
-                    if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
-                        return true;
-                    }
-                }
-            }
-
-            @Override
-            public boolean seekBefore(byte[] key) throws IOException {
-                return seekBefore(key, 0, key.length);
-            }
-
-            @Override
-            public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
-
-                if (top) {
-                    byte[] fk = getFirstKey();
-                    // This will be null when the file is empty in which we can not seekBefore to
-                    // any key
-                    if (fk == null) {
-                        return false;
-                    }
-                    if (getComparator().compare(key, offset, length, fk, 0, fk.length) <= 0) {
-                        return false;
-                    }
-                    KeyValue replacedKey = getKeyPresentInHFiles(key);
-                    return this.delegate.seekBefore(replacedKey);
-                } else {
-                    // The equals sign isn't strictly necessary just here to be consistent with
-                    // seekTo
-                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
-                        return this.delegate.seekBefore(splitkey, 0, splitkey.length);
-                    }
-                }
-                return this.delegate.seekBefore(key, offset, length);
-            }
-
-            @Override
-            public boolean seekBefore(Cell cell) throws IOException {
-                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-                return seekBefore(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
-            }
-
-            @Override
-            public boolean seekTo() throws IOException {
-                boolean b = delegate.seekTo();
-                if (!b) {
-                    atEnd = true;
-                    return b;
-                }
-                while (true) {
-                    // We need to check the first occurrence of satisfying the condition
-                    // In case of top store file if the ActualRowKey >= SplitKey
-                    // In case of bottom store file if the ActualRowKey < Splitkey
-                    if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
-                        return true;
-                    }
-                    b = delegate.next();
-                    if (!b) {
-                        return b;
-                    }
-                }
-            }
-
-            @Override
-            public int seekTo(byte[] key) throws IOException {
-                return seekTo(key, 0, key.length);
-            }
-
-            @Override
-            public int seekTo(byte[] key, int offset, int length) throws IOException {
-                if (top) {
-                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
-                        return -1;
-                    }
-                    KeyValue replacedKey = getKeyPresentInHFiles(key);
-
-                    int seekTo =
-                            delegate.seekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
-                                replacedKey.getKeyLength());
-                    return seekTo;
-                    /*
-                     * if (seekTo == 0 || seekTo == -1) { return seekTo; } else if (seekTo == 1) {
-                     * boolean next = this.next(); }
-                     */
-                } else {
-                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
-                        // we would place the scanner in the second half.
-                        // it might be an error to return false here ever...
-                        boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
-                        if (!res) {
-                            throw new IOException(
-                                    "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
-                        }
-                        return 1;
-                    }
-                }
-                return delegate.seekTo(key, offset, length);
-            }
-
-            @Override
-            public int seekTo(Cell cell) throws IOException {
-                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-                return seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
-            }
-
-            @Override
-            public int reseekTo(byte[] key) throws IOException {
-                return reseekTo(key, 0, key.length);
-            }
-
-            @Override
-            public int reseekTo(byte[] key, int offset, int length) throws IOException {
-                if (top) {
-                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
-                        return -1;
-                    }
-                    KeyValue replacedKey = getKeyPresentInHFiles(key);
-                    return delegate.reseekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
-                        replacedKey.getKeyLength());
-                } else {
-                    if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
-                        // we would place the scanner in the second half.
-                        // it might be an error to return false here ever...
-                        boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
-                        if (!res) {
-                            throw new IOException(
-                                    "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
-                        }
-                        return 1;
-                    }
-                }
-                return delegate.reseekTo(key, offset, length);
-            }
-
-            @Override
-            public int reseekTo(Cell cell) throws IOException {
-                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-                return reseekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
-            }
-
-            @Override
-            public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
-                return this.delegate.getReader();
-            }
-
-            // TODO: Need to change as per IndexHalfStoreFileReader
-            @Override
-            public boolean isSeeked() {
-                return this.delegate.isSeeked();
-            }
-
-            // Added for compatibility with HBASE-13109
-            // Once we drop support for older versions, add an @override annotation here
-            // and figure out how to get the next indexed key
-            public Cell getNextIndexedKey() {
-                return null; // indicate that we cannot use the optimization
-            }
-        };
+    public byte[][] getViewConstants() {
+        return viewConstants;
     }
 
-    private boolean isSatisfiedMidKeyCondition(Cell kv) {
-        if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) {
-            // In case of a Delete type KV, let it be going to both the daughter regions.
-            // No problems in doing so. In the correct daughter region where it belongs to, this delete
-            // tomb will really delete a KV. In the other it will just hang around there with no actual
-            // kv coming for which this is a delete tomb. :)
-            return true;
-        }
-        ImmutableBytesWritable rowKey =
-                new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + offset,
-                        kv.getRowLength() - offset);
-        Entry<ImmutableBytesWritable, IndexMaintainer> entry = indexMaintainers.entrySet().iterator().next();
-        IndexMaintainer indexMaintainer = entry.getValue();
-        byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey);
-        IndexMaintainer actualIndexMaintainer = indexMaintainers.get(new ImmutableBytesWritable(viewIndexId));
-        byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, this.viewConstants);
-        int compareResult = Bytes.compareTo(dataRowKey, splitRow);
-        if (top) {
-            if (compareResult >= 0) {
-                return true;
-            }
-        } else {
-            if (compareResult < 0) {
-                return true;
-            }
-        }
-        return false;
+    public Map<ImmutableBytesWritable, IndexMaintainer> getIndexMaintainers() {
+        return indexMaintainers;
     }
 
-    /**
-     * In case of top half store, the passed key will be with the start key of the daughter region.
-     * But in the actual HFiles, the key will be with the start key of the old parent region. In
-     * order to make the real seek in the HFiles, we need to build the old key.
-     *
-     * The logic here is just replace daughter region start key with parent region start key
-     * in the key part.
-     *
-     * @param key
-     *
-     */
-    private KeyValue getKeyPresentInHFiles(byte[] key) {
-        KeyValue keyValue = new KeyValue(key);
-        int rowLength = keyValue.getRowLength();
-        int rowOffset = keyValue.getRowOffset();
-
-        int daughterStartKeyLength =
-                regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo
-                        .getStartKey().length;
-
-        // This comes incase of deletefamily
-        if (top
-                && 0 == keyValue.getValueLength()
-                && keyValue.getTimestamp() == HConstants.LATEST_TIMESTAMP
-                && Bytes.compareTo(keyValue.getRowArray(), keyValue.getRowOffset(),
-                    keyValue.getRowLength(), splitRow, 0, splitRow.length) == 0
-                && CellUtil.isDeleteFamily(keyValue)) {
-            KeyValue createFirstDeleteFamilyOnRow =
-                    KeyValueUtil.createFirstDeleteFamilyOnRow(regionStartKeyInHFile,
-                            keyValue.getFamily());
-            return createFirstDeleteFamilyOnRow;
-        }
+    public HRegionInfo getRegionInfo() {
+        return regionInfo;
+    }
 
-        short length = (short) (keyValue.getRowLength() - daughterStartKeyLength + offset);
-        byte[] replacedKey =
-                new byte[length + key.length - (rowOffset + rowLength) + ROW_KEY_LENGTH];
-        System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_KEY_LENGTH);
-        System.arraycopy(regionStartKeyInHFile, 0, replacedKey, ROW_KEY_LENGTH, offset);
-        System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + daughterStartKeyLength,
-            replacedKey, offset + ROW_KEY_LENGTH, keyValue.getRowLength()
-                    - daughterStartKeyLength);
-        System.arraycopy(key, rowOffset + rowLength, replacedKey,
-            offset + keyValue.getRowLength() - daughterStartKeyLength
-                    + ROW_KEY_LENGTH, key.length - (rowOffset + rowLength));
-        return KeyValue.createKeyValueFromKey(replacedKey);
+    public byte[] getRegionStartKeyInHFile() {
+        return regionStartKeyInHFile;
     }
 
-    @Override
-    public byte[] midkey() throws IOException {
-        // Returns null to indicate file is not splitable.
-        return null;
+    public byte[] getSplitkey() {
+        return splitkey;
     }
 
-    @Override
-    public byte[] getFirstKey() {
-        return super.getFirstKey();
+    public byte[] getSplitRow() {
+        return splitRow;
     }
 
-    @Override
-    public boolean passesKeyRangeFilter(Scan scan) {
-        return true;
+    public boolean isTop() {
+        return top;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
index b48314d..6cf8fa1 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
@@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,11 +53,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.parse.AlterIndexStatement;
-import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
@@ -67,10 +65,6 @@ import org.apache.phoenix.util.QueryUtil;
 
 public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
     
-    int storeFilesCount = 0;
-    int compactedFilesCount = 0;
-    private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
-
     @Override
     public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
             FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
@@ -116,7 +110,17 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
                     // We need not change any thing in first region data because first region start key
                     // is equal to merged region start key. So returning same reader.
                     if (Bytes.compareTo(mergeRegions.getFirst().getStartKey(), splitRow) == 0) {
-                        return reader;
+                        if (mergeRegions.getFirst().getStartKey().length == 0
+                                && region.getRegionInfo().getEndKey().length != mergeRegions
+                                        .getFirst().getEndKey().length) {
+                            childRegion = mergeRegions.getFirst();
+                            regionStartKeyInHFile =
+                                    mergeRegions.getFirst().getStartKey().length == 0 ? new byte[mergeRegions
+                                            .getFirst().getEndKey().length] : mergeRegions.getFirst()
+                                            .getStartKey();
+                        } else {
+                            return reader;
+                        }
                     } else {
                         childRegion = mergeRegions.getSecond();
                         regionStartKeyInHFile = mergeRegions.getSecond().getStartKey();
@@ -170,58 +174,31 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
         }
         return reader;
     }
-    
+
+    @SuppressWarnings("deprecation")
     @Override
     public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
             Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
             long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException {
-        InternalScanner internalScanner = super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, request);
-        Collection<StoreFile> files = request.getFiles();
-        storeFilesCount = 0;
-        compactedFilesCount = 0;
-        for(StoreFile file:files) {
-            if(!file.isReference()) {
-                return internalScanner;
-            }
+        if (!scanType.equals(ScanType.COMPACT_DROP_DELETES) || s != null || !store.hasReferences()) {
+            return s;
         }
-        storeFilesCount = files.size();
-        return internalScanner;
-    }
-
-    @Override
-    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
-            StoreFile resultFile) throws IOException {
-        super.postCompact(e, store, resultFile);
-        if(storeFilesCount > 0) compactedFilesCount++;
-        if(compactedFilesCount == storeFilesCount) {
-            PhoenixConnection conn = null;
-            try {
-                conn = QueryUtil.getConnectionOnServer(e.getEnvironment().getConfiguration()).unwrap(
-                    PhoenixConnection.class);
-                MetaDataClient client = new MetaDataClient(conn);
-                String userTableName = MetaDataUtil.getUserTableName(e.getEnvironment().getRegion().getTableDesc().getNameAsString());
-                PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
-                List<PTable> indexes = dataTable.getIndexes();
-                for (PTable index : indexes) {
-                    if (index.getIndexType() == IndexType.LOCAL) {
-                        AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
-                            org.apache.phoenix.parse.TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
-                            dataTable.getTableName().getString(), false, PIndexState.ACTIVE);
-                        client.alterIndex(indexStatement);
-                    }
-                }
-                conn.commit();
-            } catch (ClassNotFoundException ex) {
-            } catch (SQLException ex) {
-            } finally {
-                if (conn != null) {
-                    try {
-                        conn.close();
-                    } catch (SQLException ex) {
-                    }
-                }
+        List<StoreFileScanner> newScanners = new ArrayList<StoreFileScanner>(scanners.size());
+        Scan scan = new Scan();
+        scan.setMaxVersions(store.getFamily().getMaxVersions());
+        boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall());
+        for(KeyValueScanner scanner: scanners) {
+            Reader reader = ((StoreFileScanner) scanner).getReader();
+            if (reader instanceof IndexHalfStoreFileReader) {
+                newScanners.add(new LocalIndexStoreFileScanner(reader, reader.getScanner(
+                    scan.getCacheBlocks(), scanUsePread, false), true, reader.getHFileReader()
+                        .hasMVCCInfo(), store.getSmallestReadPoint()));
+            } else {
+                newScanners.add(((StoreFileScanner) scanner));
             }
         }
+        return new StoreScanner(store, store.getScanInfo(), scan, newScanners,
+            scanType, store.getSmallestReadPoint(), earliestPutTs);
     }
 
     private byte[][] getViewConstants(PTable dataTable) {
@@ -254,4 +231,42 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
         }
         return viewConstants;
     }
+
+    @Override
+    public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+        final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
+        final KeyValueScanner s) throws IOException {
+        if(store.hasReferences()) {
+            long readPt = c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel());
+            boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall());
+            Collection<StoreFile> storeFiles = store.getStorefiles();
+            List<StoreFile> nonReferenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size());
+            List<StoreFile> referenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size());
+            List<KeyValueScanner> keyValueScanners = new ArrayList<KeyValueScanner>(store.getStorefiles().size()+1);
+            for(StoreFile storeFile : storeFiles) {
+                if (storeFile.isReference()) {
+                    referenceStoreFiles.add(storeFile);
+                } else {
+                    nonReferenceStoreFiles.add(storeFile);
+                }
+            } 
+            List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(nonReferenceStoreFiles, scan.getCacheBlocks(), scanUsePread, readPt);
+            keyValueScanners.addAll(scanners);
+            for(StoreFile sf :  referenceStoreFiles) {
+                if(sf.getReader() instanceof IndexHalfStoreFileReader) {
+                    keyValueScanners.add(new LocalIndexStoreFileScanner(sf.getReader(), sf.getReader()
+                        .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf
+                        .getReader().getHFileReader().hasMVCCInfo(), readPt));
+                } else {
+                    keyValueScanners.add(new StoreFileScanner(sf.getReader(), sf.getReader()
+                        .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf
+                        .getReader().getHFileReader().hasMVCCInfo(), readPt));
+                }
+            }
+            keyValueScanners.addAll(((HStore)store).memstore.getScanners(readPt));
+            if(!scan.isReversed()) return new StoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners);
+            return new ReversedStoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners);
+        } 
+        return s;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
index 63cf3ba..ba158a8 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
@@ -31,23 +31,14 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.parse.AlterIndexStatement;
 import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.sql.SQLException;
 import java.util.List;
 
 public class LocalIndexSplitter extends BaseRegionObserver {
@@ -144,34 +135,6 @@ public class LocalIndexSplitter extends BaseRegionObserver {
             throws IOException {
         if (st == null || daughterRegions == null) return;
         RegionCoprocessorEnvironment environment = ctx.getEnvironment();
-        PhoenixConnection conn = null;
-        try {
-            conn = QueryUtil.getConnectionOnServer(ctx.getEnvironment().getConfiguration()).unwrap(
-                PhoenixConnection.class);
-            MetaDataClient client = new MetaDataClient(conn);
-            String userTableName = ctx.getEnvironment().getRegion().getTableDesc().getNameAsString();
-            PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
-            List<PTable> indexes = dataTable.getIndexes();
-            for (PTable index : indexes) {
-                if (index.getIndexType() == IndexType.LOCAL) {
-                    AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
-                        org.apache.phoenix.parse.TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
-                        dataTable.getTableName().getString(), false, PIndexState.INACTIVE);
-                    client.alterIndex(indexStatement);
-                }
-            }
-            conn.commit();
-        } catch (ClassNotFoundException ex) {
-        } catch (SQLException ex) {
-        } finally {
-            if (conn != null) {
-                try {
-                    conn.close();
-                } catch (SQLException ex) {
-                }
-            }
-        }
-
         HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
         st.stepsAfterPONR(rs, rs, daughterRegions);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java
new file mode 100644
index 0000000..a6e5005
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.index.IndexMaintainer;
+
+import static org.apache.hadoop.hbase.KeyValue.ROW_LENGTH_SIZE;
+
+public class LocalIndexStoreFileScanner extends StoreFileScanner{
+
+    private IndexHalfStoreFileReader reader;
+    private boolean changeBottomKeys;
+    public LocalIndexStoreFileScanner(Reader reader, HFileScanner hfs, boolean useMVCC,
+            boolean hasMVCC, long readPt) {
+        super(reader, hfs, useMVCC, hasMVCC, readPt);
+        this.reader = ((IndexHalfStoreFileReader)super.getReader());
+        this.changeBottomKeys =
+                this.reader.getRegionInfo().getStartKey().length == 0
+                        && this.reader.getSplitRow().length != this.reader.getOffset();
+    }
+
+    @Override
+    public Cell next() throws IOException {
+        Cell next = super.next();
+        while(next !=null && !isSatisfiedMidKeyCondition(next)) {
+            next = super.next();
+        }
+        while(super.peek() != null && !isSatisfiedMidKeyCondition(super.peek())) {
+            super.next();
+        }
+        if (next!=null && (reader.isTop() || changeBottomKeys)) {
+            next = getChangedKey(next,  !reader.isTop() && changeBottomKeys);
+        } 
+        return next;
+    }
+
+    @Override
+    public Cell peek() {
+        Cell peek = super.peek();
+        if (peek != null && (reader.isTop() || changeBottomKeys)) {
+            peek = getChangedKey(peek, !reader.isTop() && changeBottomKeys);
+        } 
+        return peek;
+    }
+
+    private KeyValue getChangedKey(Cell next, boolean changeBottomKeys) {
+        // If it is a top store file change the StartKey with SplitKey in Key
+        //and produce the new value corresponding to the change in key
+        byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(next, changeBottomKeys);
+        KeyValue changedKv =
+                new KeyValue(changedKey, 0, changedKey.length, next.getFamilyArray(),
+                    next.getFamilyOffset(), next.getFamilyLength(), next.getQualifierArray(),
+                    next.getQualifierOffset(), next.getQualifierLength(),
+                    next.getTimestamp(), Type.codeToType(next.getTypeByte()),
+                    next.getValueArray(), next.getValueOffset(), next.getValueLength(),
+                    next.getTagsArray(), next.getTagsOffset(), next.getTagsLength());
+        return changedKv;
+    }
+
+    @Override
+    public boolean seek(Cell key) throws IOException {
+        return seekOrReseek(key, true);
+    }
+
+    @Override
+    public boolean reseek(Cell key) throws IOException {
+        return seekOrReseek(key, false);
+    }
+
+    @Override
+    public boolean seekToPreviousRow(Cell key) throws IOException {
+        KeyValue kv = KeyValueUtil.ensureKeyValue(key);
+        if (reader.isTop()) {
+            byte[] fk = reader.getFirstKey();
+            // This will be null when the file is empty in which we can not seekBefore to
+            // any key
+            if (fk == null) {
+                return false;
+            }
+            if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), fk, 0, fk.length) <= 0) {
+                return super.seekToPreviousRow(key);
+            }
+            KeyValue replacedKey = getKeyPresentInHFiles(kv.getBuffer());
+            boolean seekToPreviousRow = super.seekToPreviousRow(replacedKey);
+            while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+                seekToPreviousRow = super.seekToPreviousRow(super.peek());
+            }
+            return seekToPreviousRow;
+        } else {
+            // The equals sign isn't strictly necessary just here to be consistent with
+            // seekTo
+            if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) {
+                boolean seekToPreviousRow = super.seekToPreviousRow(kv);
+                while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+                    seekToPreviousRow = super.seekToPreviousRow(super.peek());
+                }
+                return seekToPreviousRow;
+            }
+        }
+        boolean seekToPreviousRow = super.seekToPreviousRow(kv);
+        while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+            seekToPreviousRow = super.seekToPreviousRow(super.peek());
+        }
+        return seekToPreviousRow;
+    }
+
+    @Override
+    public boolean seekToLastRow() throws IOException {
+        boolean seekToLastRow = super.seekToLastRow();
+        while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+            seekToLastRow = super.seekToPreviousRow(super.peek());
+        }
+        return seekToLastRow;
+    }
+
+    private boolean isSatisfiedMidKeyCondition(Cell kv) {
+        if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) {
+            // In case of a Delete type KV, let it be going to both the daughter regions.
+            // No problems in doing so. In the correct daughter region where it belongs to, this delete
+            // tomb will really delete a KV. In the other it will just hang around there with no actual
+            // kv coming for which this is a delete tomb. :)
+            return true;
+        }
+        ImmutableBytesWritable rowKey =
+                new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + reader.getOffset(),
+                        kv.getRowLength() - reader.getOffset());
+        Entry<ImmutableBytesWritable, IndexMaintainer> entry = reader.getIndexMaintainers().entrySet().iterator().next();
+        IndexMaintainer indexMaintainer = entry.getValue();
+        byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey);
+        IndexMaintainer actualIndexMaintainer = reader.getIndexMaintainers().get(new ImmutableBytesWritable(viewIndexId));
+        byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, reader.getViewConstants());
+        int compareResult = Bytes.compareTo(dataRowKey, reader.getSplitRow());
+        if (reader.isTop()) {
+            if (compareResult >= 0) {
+                return true;
+            }
+        } else {
+            if (compareResult < 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * In case of top half store, the passed key will be with the start key of the daughter region.
+     * But in the actual HFiles, the key will be with the start key of the old parent region. In
+     * order to make the real seek in the HFiles, we need to build the old key.
+     *
+     * The logic here is just replace daughter region start key with parent region start key
+     * in the key part.
+     *
+     * @param key
+     *
+     */
+    private KeyValue getKeyPresentInHFiles(byte[] key) {
+        KeyValue keyValue = new KeyValue(key);
+        int rowLength = keyValue.getRowLength();
+        int rowOffset = keyValue.getRowOffset();
+
+        short length = (short) (rowLength - reader.getSplitRow().length + reader.getOffset());
+        byte[] replacedKey =
+                new byte[length + key.length - (rowOffset + rowLength) + ROW_LENGTH_SIZE];
+        System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_LENGTH_SIZE);
+        System.arraycopy(reader.getRegionStartKeyInHFile(), 0, replacedKey, ROW_LENGTH_SIZE, reader.getOffset());
+        System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + reader.getSplitRow().length,
+            replacedKey, reader.getOffset() + ROW_LENGTH_SIZE, rowLength
+                    - reader.getSplitRow().length);
+        System.arraycopy(key, rowOffset + rowLength, replacedKey,
+            reader.getOffset() + keyValue.getRowLength() - reader.getSplitRow().length
+                    + ROW_LENGTH_SIZE, key.length - (rowOffset + rowLength));
+        return new KeyValue.KeyOnlyKeyValue(replacedKey);
+    }
+    
+    /**
+     * 
+     * @param kv
+     * @param isSeek pass true for seek, false for reseek.
+     * @return 
+     * @throws IOException
+     */
+    public boolean seekOrReseek(Cell cell, boolean isSeek) throws IOException{
+        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+        KeyValue keyToSeek = kv;
+        if (reader.isTop()) {
+            if(getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) < 0){
+                if(!isSeek && realSeekDone()) {
+                    return true;
+                }
+                return seekOrReseekToProperKey(isSeek, keyToSeek);
+            }
+            keyToSeek = getKeyPresentInHFiles(kv.getBuffer());
+            return seekOrReseekToProperKey(isSeek, keyToSeek);
+        } else {
+            if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) {
+                close();
+                return false;
+            }
+            if(!isSeek && reader.getRegionInfo().getStartKey().length == 0 && reader.getSplitRow().length > reader.getRegionStartKeyInHFile().length) {
+                keyToSeek = getKeyPresentInHFiles(kv.getBuffer());
+            }
+        }
+        return seekOrReseekToProperKey(isSeek, keyToSeek);
+    }
+
+    private boolean seekOrReseekToProperKey(boolean isSeek, KeyValue kv)
+            throws IOException {
+        boolean seekOrReseek = isSeek ? super.seek(kv) : super.reseek(kv);
+        while (seekOrReseek && super.peek() != null
+                && !isSatisfiedMidKeyCondition(super.peek())) {
+            super.next();
+            seekOrReseek = super.peek() != null;
+        }
+        return seekOrReseek;
+    }
+
+    private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(Cell kv, boolean changeBottomKeys) {
+        int lenOfRemainingKey = kv.getRowLength() - reader.getOffset();
+        byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + reader.getSplitRow().length];
+        System.arraycopy(changeBottomKeys ? new byte[reader.getSplitRow().length] : reader.getSplitRow(), 0,
+                keyReplacedStartKey, 0, reader.getSplitRow().length);
+        System.arraycopy(kv.getRowArray(), kv.getRowOffset() + reader.getOffset(), keyReplacedStartKey,
+                reader.getSplitRow().length, lenOfRemainingKey);
+        return keyReplacedStartKey;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 94736ed..b52e704 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -117,6 +117,10 @@ public class ListJarsQueryPlan implements QueryPlan {
     }
 
     @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan s) throws SQLException {
+        return iterator(scanGrouper);
+    }
+    @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
         return new ResultIterator() {
             private RemoteIterator<LocatedFileStatus> listFiles = null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index ed421a7..8e63fa9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -53,7 +53,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
     abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
     
     @Override
-    public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException {
+    public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName, QueryPlan plan) throws SQLException {
         final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
         
         MutationState state = mutate(parentContext, iterator, clonedConnection);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 4dcc134..7722483 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -49,6 +49,8 @@ public interface QueryPlan extends StatementPlan {
     
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException;
     
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
+
     public long getEstimatedSize();
     
     // TODO: change once joins are supported

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 719970a..5edaead 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.compile;
 
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -204,7 +203,7 @@ public class ScanRanges {
         scan.setStopRow(scanRange.getUpperRange());
     }
     
-    private static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) {
+    public static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) {
         if (key.length > 0) {
             byte[] newKey = new byte[key.length + prefixKeyOffset];
             int totalKeyOffset = keyOffset + prefixKeyOffset;
@@ -213,7 +212,7 @@ public class ScanRanges {
             }
             System.arraycopy(key, keyOffset, newKey, totalKeyOffset, key.length - keyOffset);
             return newKey;
-        }
+        } 
         return key;
     }
     
@@ -229,7 +228,7 @@ public class ScanRanges {
         return temp;
     }
     
-    private static byte[] stripPrefix(byte[] key, int keyOffset) {
+    public static byte[] stripPrefix(byte[] key, int keyOffset) {
         if (key.length == 0) {
             return key;
         }
@@ -388,10 +387,6 @@ public class ScanRanges {
         newScan.setAttribute(SCAN_ACTUAL_START_ROW, scanStartKey);
         newScan.setStartRow(scanStartKey);
         newScan.setStopRow(scanStopKey);
-        if(keyOffset > 0) {
-            newScan.setAttribute(STARTKEY_OFFSET, Bytes.toBytes(keyOffset));
-        }
-
         return newScan;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 54b4eb7..5e0977b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -115,6 +115,11 @@ public class TraceQueryPlan implements QueryPlan {
     }
 
     @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+        return iterator(scanGrouper);
+    }
+        
+    @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
         final PhoenixConnection conn = stmt.getConnection();
         if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) {