You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/06 17:19:29 UTC

git commit: PHOENIX-1146 Detect stale client region cache on server and retry scans in split regions

Repository: phoenix
Updated Branches:
  refs/heads/3.0 b951f85bd -> 997919401


PHOENIX-1146 Detect stale client region cache on server and retry scans in split regions


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/99791940
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/99791940
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/99791940

Branch: refs/heads/3.0
Commit: 997919401e92a749029776a3677f5ef3b74bbb7e
Parents: b951f85
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Aug 6 08:22:57 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Aug 6 08:22:57 2014 -0700

----------------------------------------------------------------------
 .../end2end/SkipScanAfterManualSplitIT.java     |  21 +--
 .../coprocessor/BaseScannerRegionObserver.java  |  25 ++-
 .../GroupedAggregateRegionObserver.java         |   9 +-
 .../phoenix/coprocessor/ScanRegionObserver.java |  12 +-
 .../UngroupedAggregateRegionObserver.java       |  10 +-
 .../phoenix/exception/SQLExceptionCode.java     |   7 +
 .../phoenix/iterate/ParallelIterators.java      | 156 ++++++++++++-------
 .../StaleRegionBoundaryCacheException.java      |  46 ++++++
 .../org/apache/phoenix/util/SchemaUtil.java     |  33 +++-
 .../org/apache/phoenix/util/ServerUtil.java     |  13 +-
 10 files changed, 238 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
index 764d1e2..1731917 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
@@ -28,10 +28,8 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -44,7 +42,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 @Category(HBaseManagedTimeTest.class)
@@ -71,9 +68,10 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
     public static void doSetup() throws Exception {
         Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
         // needed for 64 region parallelization due to splitting
-        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
+        // props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
+        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32));
         // enables manual splitting on salted tables
-        props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false));
+        // props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false));
         props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000));
         setUpTestDriver(getUrl(), new ReadOnlyProps(props.entrySet().iterator()));
     }
@@ -109,22 +107,11 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
         conn.close();
     }
     
-    private static void traceRegionBoundaries(ConnectionQueryServices services) throws Exception {
-        List<String> boundaries = Lists.newArrayList();
-        List<HRegionLocation> regions = services.getAllTableRegions(TABLE_NAME_BYTES);
-        for (HRegionLocation region : regions.subList(1, regions.size())) {
-            boundaries.add(Bytes.toStringBinary(region.getRegionInfo().getStartKey()));
-        }
-        System.out.println("Region boundaries:\n" + boundaries);
-    }
-
-    @Ignore
     @Test
     public void testManualSplit() throws Exception {
         initTable();
         Connection conn = DriverManager.getConnection(getUrl());
         ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
-        traceRegionBoundaries(services);
         int nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
         int nInitialRegions = nRegions;
         HBaseAdmin admin = services.getAdmin();
@@ -144,7 +131,6 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
             String query = "SELECT /*+ NO_INTRA_REGION_PARALLELIZATION */ count(*) FROM S WHERE a IN ('tl','jt')";
             ResultSet rs1 = conn.createStatement().executeQuery(query);
             assertTrue(rs1.next());
-            traceRegionBoundaries(services);
             nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
             // Region cache has been updated, as there are more regions now
             assertNotEquals(nRegions, nInitialRegions);
@@ -291,6 +277,7 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
      * See PHOENIX-1133 and PHOENIX-1136 on apache JIRA for more details.
      * @throws java.sql.SQLException  from Connection
      */
+    @Ignore
     @Test
     public void testSkipScanInListOfRVCAfterManualSplit() throws SQLException {
         Connection conn = DriverManager.getConnection(getUrl());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b94bf0a..2a23f0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -19,11 +19,15 @@ package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.util.ServerUtil;
 
 
@@ -53,7 +57,21 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
         return this.getClass().getName();
     }
     
-    abstract protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable;
+    
+    private static void throwIfScanOutOfRegion(Scan scan, HRegion region) throws DoNotRetryIOException {
+        byte[] lowerInclusiveScanKey = scan.getStartRow();
+        byte[] upperExclusiveScanKey = scan.getStopRow();
+        byte[] lowerInclusiveRegionKey = region.getStartKey();
+        byte[] upperExclusiveRegionKey = region.getEndKey();
+        if (Bytes.compareTo(lowerInclusiveScanKey, lowerInclusiveRegionKey) < 0 ||
+            (Bytes.compareTo(upperExclusiveScanKey, upperExclusiveRegionKey) > 0 && upperExclusiveRegionKey.length != 0) ) {
+            Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTableName());
+            throw new DoNotRetryIOException(cause.getMessage(), cause);
+        }
+    }
+
+    abstract protected boolean isRegionObserverFor(Scan scan);
+    abstract protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable;
     
     /**
      * Wrapper for {@link #postScannerOpen(ObserverContext, Scan, RegionScanner)} that ensures no non IOException is thrown,
@@ -63,6 +81,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     @Override
     public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
         try {
+            if (!isRegionObserverFor(scan)) {
+                return s;
+            }
+            HRegion region = c.getEnvironment().getRegion();
+            throwIfScanOutOfRegion(scan, region);
             return doPostScannerOpen(c, scan, s);
         } catch (Throwable t) {
             ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index d5e3deb..44e9dfa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -100,9 +100,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
 
         if (expressionBytes == null) {
             expressionBytes = scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS);
-            if (expressionBytes == null) {
-                return s;
-            }
             keyOrdered = true;
         }
         List<Expression> expressions = deserializeGroupByExpressions(expressionBytes);
@@ -516,4 +513,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             }
         };
     }
+
+    @Override
+    protected boolean isRegionObserverFor(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != null ||
+               scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) != null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 2df8649..6d8a834 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -52,7 +52,6 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
-import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -170,12 +169,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
 
     @Override
     protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable {
-        byte[] isScanQuery = scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY);
-
-        if (isScanQuery == null || Bytes.compareTo(PDataType.FALSE_BYTES, isScanQuery) == 0) {
-            return s;
-        }
-        
         final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
@@ -411,6 +404,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             }
         };
     }
+
+    @Override
+    protected boolean isRegionObserverFor(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
+    }
     
     
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index cc311ca..566ec67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -118,11 +118,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     @Override
     protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
-        byte[] isUngroupedAgg = scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG);
-        if (isUngroupedAgg == null) {
-            return s;
-        }
-        
         final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         RegionScanner theScanner = s;
@@ -408,4 +403,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             }
         }
     }
+
+    @Override
+    protected boolean isRegionObserverFor(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d6ff6d5..74f91e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SequenceAlreadyExistsException;
 import org.apache.phoenix.schema.SequenceNotFoundException;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TypeMismatchException;
@@ -258,6 +259,12 @@ public enum SQLExceptionCode {
     SPLIT_POINT_NOT_CONSTANT(1105, "XCL04", "Split points must be constants."),
     BATCH_EXCEPTION(1106, "XCL05", "Exception while executing batch."),
     EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH(1107, "XCL06", "An executeUpdate is prohibited when the batch is not empty. Use clearBatch to empty the batch first."),
+    STALE_REGION_BOUNDARY_CACHE(1108, "XCL07", "Cache of region boundaries are out of date.", new Factory() {
+        @Override
+        public SQLException newException(SQLExceptionInfo info) {
+            return new StaleRegionBoundaryCacheException(info.getSchemaName(), info.getTableName());
+        }
+    }),
     
     /**
      * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).

http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 61767ee..da9a819 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -56,6 +57,7 @@ import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseables;
@@ -66,6 +68,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
+import com.google.common.collect.Lists;
 
 
 /**
@@ -228,6 +231,14 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         return ParallelIteratorRegionSplitterFactory.getSplitter(context, table, hintNode).getSplits();
     }
 
+    private static List<KeyRange> toKeyRanges(List<HRegionLocation> regions) {
+        List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(regions.size());
+        for (HRegionLocation region : regions) {
+            keyRanges.add(TO_KEY_RANGE.apply(region));
+        }
+        return keyRanges;
+    }
+    
     public List<KeyRange> getSplits() {
         return splits;
     }
@@ -243,85 +254,118 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
         ReadOnlyProps props = services.getProps();
         int numSplits = splits.size();
         List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
-        List<Pair<byte[],Future<PeekingResultIterator>>> futures = new ArrayList<Pair<byte[],Future<PeekingResultIterator>>>(numSplits);
+        List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits);
         final UUID scanId = UUID.randomUUID();
         try {
-            ExecutorService executor = services.getExecutor();
-            for (KeyRange split : splits) {
-                final Scan splitScan = new Scan(this.context.getScan());
-                // Intersect with existing start/stop key if the table is salted
-                // If not salted, we've already intersected it. If salted, we need
-                // to wait until now to intersect, as we're running parallel scans
-                // on all the possible regions here.
-                if (tableRef.getTable().getBucketNum() != null) {
-                    KeyRange minMaxRange = context.getMinMaxRange();
-                    if (minMaxRange != null) {
-                        // Add salt byte based on current split, as minMaxRange won't have it
-                        minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
-                        split = split.intersect(minMaxRange);
-                    }
-                }
-                if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
-                    // Delay the swapping of start/stop row until row so we don't muck with the intersect logic
-                    ScanUtil.swapStartStopRowIfReversed(splitScan);
-                    Future<PeekingResultIterator> future =
-                        executor.submit(new JobCallable<PeekingResultIterator>() {
-
-                        @Override
-                        public PeekingResultIterator call() throws Exception {
-                            // TODO: different HTableInterfaces for each thread or the same is better?
-
-                            StatementContext scanContext = new StatementContext(context, splitScan);
-                        	long startTime = System.currentTimeMillis();
-                            ResultIterator scanner = new TableResultIterator(scanContext, tableRef, splitScan);
-                            if (logger.isDebugEnabled()) {
-                            	logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
-                            }
-                            return iteratorFactory.newIterator(scanContext, scanner);
-                        }
-
-                        /**
-                         * Defines the grouping for round robin behavior.  All threads spawned to process
-                         * this scan will be grouped together and time sliced with other simultaneously
-                         * executing parallel scans.
-                         */
-                        @Override
-                        public Object getJobId() {
-                            return ParallelIterators.this;
-                        }
-                    });
-                    futures.add(new Pair<byte[],Future<PeekingResultIterator>>(split.getLowerRange(),future));
-                }
-            }
-
+            submitWork(scanId, splits, futures);
             int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
             final int factor = ScanUtil.isReversed(this.context.getScan()) ? -1 : 1;
-            // Sort futures by row key so that we have a predicatble order we're getting rows back for scans.
+            // Sort futures by row key so that we have a predictable order we're getting rows back for scans.
             // We're going to wait here until they're finished anyway and this makes testing much easier.
-            Collections.sort(futures, new Comparator<Pair<byte[],Future<PeekingResultIterator>>>() {
+            Collections.sort(futures, new Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() {
                 @Override
-                public int compare(Pair<byte[], Future<PeekingResultIterator>> o1, Pair<byte[], Future<PeekingResultIterator>> o2) {
-                    return factor * Bytes.compareTo(o1.getFirst(), o2.getFirst());
+                public int compare(Pair<KeyRange, Future<PeekingResultIterator>> o1, Pair<KeyRange, Future<PeekingResultIterator>> o2) {
+                    return factor * Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange());
                 }
             });
-            for (Pair<byte[],Future<PeekingResultIterator>> future : futures) {
-                iterators.add(future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS));
+            boolean clearedCache = false;
+            byte[] tableName = tableRef.getTable().getName().getBytes();
+            for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
+                try {
+                    PeekingResultIterator iterator = future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                    iterators.add(iterator);
+                } catch (ExecutionException e) {
+                    try { // Rethrow as SQLException
+                        throw ServerUtil.parseServerException(e);
+                    } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+                        List<Pair<KeyRange,Future<PeekingResultIterator>>> newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2);
+                        if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
+                            services.clearTableRegionCache(tableName);
+                            clearedCache = true;
+                        }
+                        List<KeyRange> allSplits = toKeyRanges(services.getAllTableRegions(tableName));
+                        // Intersect what was the expected boundary with all new region boundaries and
+                        // resubmit just this portion of work again
+                        List<KeyRange> newSubSplits = KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits);
+                        submitWork(scanId, newSubSplits, newFutures);
+                        for (Pair<KeyRange,Future<PeekingResultIterator>> newFuture : newFutures) {
+                            // Immediate do a get (not catching exception again) and then add the iterators we
+                            // get back immediately. They'll be sorted as expected, since they're replacing the
+                            // original one.
+                            PeekingResultIterator iterator = newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                            iterators.add(iterator);
+                        }
+                    }
+                }
             }
 
             success = true;
             return iterators;
+        } catch (SQLException e) {
+            throw e;
         } catch (Exception e) {
             throw ServerUtil.parseServerException(e);
         } finally {
             if (!success) {
                 SQLCloseables.closeAllQuietly(iterators);
                 // Don't call cancel, as it causes the HConnection to get into a funk
-//                for (Pair<byte[],Future<PeekingResultIterator>> future : futures) {
+//                for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
 //                    future.getSecond().cancel(true);
 //                }
             }
         }
     }
+    
+    private void submitWork(final UUID scanId, List<KeyRange> splits,
+            List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) {
+        final ConnectionQueryServices services = context.getConnection().getQueryServices();
+        ExecutorService executor = services.getExecutor();
+        for (KeyRange split : splits) {
+            final Scan splitScan = ScanUtil.newScan(context.getScan());
+            // Intersect with existing start/stop key if the table is salted
+            // If not salted, we've already intersected it. If salted, we need
+            // to wait until now to intersect, as we're running parallel scans
+            // on all the possible regions here.
+            if (tableRef.getTable().getBucketNum() != null) {
+                KeyRange minMaxRange = context.getMinMaxRange();
+                if (minMaxRange != null) {
+                    // Add salt byte based on current split, as minMaxRange won't have it
+                    minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
+                    split = split.intersect(minMaxRange);
+                }
+            }
+            if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
+                // Delay the swapping of start/stop row until row so we don't muck with the intersect logic
+                ScanUtil.swapStartStopRowIfReversed(splitScan);
+                Future<PeekingResultIterator> future =
+                    executor.submit(new JobCallable<PeekingResultIterator>() {
+
+                    @Override
+                    public PeekingResultIterator call() throws Exception {
+                        StatementContext scanContext = new StatementContext(context, splitScan);
+                        long startTime = System.currentTimeMillis();
+                        ResultIterator scanner = new TableResultIterator(scanContext, tableRef, splitScan);
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
+                        }
+                        return iteratorFactory.newIterator(scanContext, scanner);
+                    }
+
+                    /**
+                     * Defines the grouping for round robin behavior.  All threads spawned to process
+                     * this scan will be grouped together and time sliced with other simultaneously
+                     * executing parallel scans.
+                     */
+                    @Override
+                    public Object getJobId() {
+                        return ParallelIterators.this;
+                    }
+                });
+                futures.add(new Pair<KeyRange,Future<PeekingResultIterator>>(split,future));
+            }
+        }
+
+    }
 
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
new file mode 100644
index 0000000..eb9d875
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class StaleRegionBoundaryCacheException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode ERROR_CODE = SQLExceptionCode.STALE_REGION_BOUNDARY_CACHE;
+
+    public StaleRegionBoundaryCacheException() {
+        this(null, null);
+    }
+
+    public StaleRegionBoundaryCacheException(byte[] fullTableName) {
+        this(SchemaUtil.getSchemaNameFromFullName(fullTableName),SchemaUtil.getTableNameFromFullName(fullTableName));
+    }
+
+    public StaleRegionBoundaryCacheException(String fullTableName) {
+        this(SchemaUtil.getSchemaNameFromFullName(fullTableName),SchemaUtil.getTableNameFromFullName(fullTableName));
+    }
+
+    public StaleRegionBoundaryCacheException(String schemaName, String tableName) {
+        super(new SQLExceptionInfo.Builder(ERROR_CODE).setSchemaName(schemaName).setTableName(tableName).build().toString(),
+            ERROR_CODE.getSQLState(), ERROR_CODE.getErrorCode(), null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 09f96e3..21dab0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -484,11 +484,42 @@ public class SchemaUtil {
     public static String getSchemaNameFromFullName(String tableName) {
         int index = tableName.indexOf(QueryConstants.NAME_SEPARATOR);
         if (index < 0) {
-            return ""; 
+            return StringUtil.EMPTY_STRING; 
         }
         return tableName.substring(0, index);
     }
     
+    private static int indexOf (byte[] bytes, byte b) {
+        for (int i = 0; i < bytes.length; i++) {
+            if (bytes[i] == b) {
+                return i;
+            }
+        }
+        return -1;
+    }
+    
+    public static String getSchemaNameFromFullName(byte[] tableName) {
+        if (tableName == null) {
+            return null;
+        }
+        int index = indexOf(tableName, QueryConstants.NAME_SEPARATOR_BYTE);
+        if (index < 0) {
+            return StringUtil.EMPTY_STRING; 
+        }
+        return Bytes.toString(tableName, 0, index);
+    }
+    
+    public static String getTableNameFromFullName(byte[] tableName) {
+        if (tableName == null) {
+            return null;
+        }
+        int index = indexOf(tableName, QueryConstants.NAME_SEPARATOR_BYTE);
+        if (index < 0) {
+            return Bytes.toString(tableName); 
+        }
+        return Bytes.toString(tableName, index+1, tableName.length);
+    }
+
     public static String getTableNameFromFullName(String tableName) {
         int index = tableName.indexOf(QueryConstants.NAME_SEPARATOR);
         if (index < 0) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 58a3478..45a6f14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -25,9 +25,9 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 
 
 public class ServerUtil {
@@ -110,9 +110,14 @@ public class ServerUtil {
             // If the message matches the standard pattern, recover the SQLException and throw it.
             Matcher matcher = PATTERN.matcher(t.getLocalizedMessage());
             if (matcher.find()) {
-                int errorCode = Integer.parseInt(matcher.group(1));
-                String sqlState = matcher.group(2);
-                return new SQLException(matcher.group(), sqlState, errorCode, t);
+                int statusCode = Integer.parseInt(matcher.group(1));
+                SQLExceptionCode code;
+                try {
+                    code = SQLExceptionCode.fromErrorCode(statusCode);
+                } catch (SQLException e) {
+                    return e;
+                }
+                return new SQLExceptionInfo.Builder(code).setMessage(matcher.group()).build().buildException();
             }
         	}
         return null;