You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/06 17:19:29 UTC
git commit: PHOENIX-1146 Detect stale client region cache on server
and retry scans in split regions
Repository: phoenix
Updated Branches:
refs/heads/3.0 b951f85bd -> 997919401
PHOENIX-1146 Detect stale client region cache on server and retry scans in split regions
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/99791940
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/99791940
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/99791940
Branch: refs/heads/3.0
Commit: 997919401e92a749029776a3677f5ef3b74bbb7e
Parents: b951f85
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Aug 6 08:22:57 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Aug 6 08:22:57 2014 -0700
----------------------------------------------------------------------
.../end2end/SkipScanAfterManualSplitIT.java | 21 +--
.../coprocessor/BaseScannerRegionObserver.java | 25 ++-
.../GroupedAggregateRegionObserver.java | 9 +-
.../phoenix/coprocessor/ScanRegionObserver.java | 12 +-
.../UngroupedAggregateRegionObserver.java | 10 +-
.../phoenix/exception/SQLExceptionCode.java | 7 +
.../phoenix/iterate/ParallelIterators.java | 156 ++++++++++++-------
.../StaleRegionBoundaryCacheException.java | 46 ++++++
.../org/apache/phoenix/util/SchemaUtil.java | 33 +++-
.../org/apache/phoenix/util/ServerUtil.java | 13 +-
10 files changed, 238 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
index 764d1e2..1731917 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
@@ -28,10 +28,8 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;
@@ -44,7 +42,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@Category(HBaseManagedTimeTest.class)
@@ -71,9 +68,10 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
public static void doSetup() throws Exception {
Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
// needed for 64 region parallelization due to splitting
- props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
+ // props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
+ props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32));
// enables manual splitting on salted tables
- props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false));
+ // props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false));
props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000));
setUpTestDriver(getUrl(), new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -109,22 +107,11 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
conn.close();
}
- private static void traceRegionBoundaries(ConnectionQueryServices services) throws Exception {
- List<String> boundaries = Lists.newArrayList();
- List<HRegionLocation> regions = services.getAllTableRegions(TABLE_NAME_BYTES);
- for (HRegionLocation region : regions.subList(1, regions.size())) {
- boundaries.add(Bytes.toStringBinary(region.getRegionInfo().getStartKey()));
- }
- System.out.println("Region boundaries:\n" + boundaries);
- }
-
- @Ignore
@Test
public void testManualSplit() throws Exception {
initTable();
Connection conn = DriverManager.getConnection(getUrl());
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
- traceRegionBoundaries(services);
int nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
int nInitialRegions = nRegions;
HBaseAdmin admin = services.getAdmin();
@@ -144,7 +131,6 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
String query = "SELECT /*+ NO_INTRA_REGION_PARALLELIZATION */ count(*) FROM S WHERE a IN ('tl','jt')";
ResultSet rs1 = conn.createStatement().executeQuery(query);
assertTrue(rs1.next());
- traceRegionBoundaries(services);
nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
// Region cache has been updated, as there are more regions now
assertNotEquals(nRegions, nInitialRegions);
@@ -291,6 +277,7 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT {
* See PHOENIX-1133 and PHOENIX-1136 on apache JIRA for more details.
* @throws java.sql.SQLException from Connection
*/
+ @Ignore
@Test
public void testSkipScanInListOfRVCAfterManualSplit() throws SQLException {
Connection conn = DriverManager.getConnection(getUrl());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b94bf0a..2a23f0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -19,11 +19,15 @@ package org.apache.phoenix.coprocessor;
import java.io.IOException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.util.ServerUtil;
@@ -53,7 +57,21 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
return this.getClass().getName();
}
- abstract protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable;
+
+ private static void throwIfScanOutOfRegion(Scan scan, HRegion region) throws DoNotRetryIOException {
+ byte[] lowerInclusiveScanKey = scan.getStartRow();
+ byte[] upperExclusiveScanKey = scan.getStopRow();
+ byte[] lowerInclusiveRegionKey = region.getStartKey();
+ byte[] upperExclusiveRegionKey = region.getEndKey();
+ if (Bytes.compareTo(lowerInclusiveScanKey, lowerInclusiveRegionKey) < 0 ||
+ (Bytes.compareTo(upperExclusiveScanKey, upperExclusiveRegionKey) > 0 && upperExclusiveRegionKey.length != 0) ) {
+ Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTableName());
+ throw new DoNotRetryIOException(cause.getMessage(), cause);
+ }
+ }
+
+ abstract protected boolean isRegionObserverFor(Scan scan);
+ abstract protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable;
/**
* Wrapper for {@link #postScannerOpen(ObserverContext, Scan, RegionScanner)} that ensures no non IOException is thrown,
@@ -63,6 +81,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@Override
public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
try {
+ if (!isRegionObserverFor(scan)) {
+ return s;
+ }
+ HRegion region = c.getEnvironment().getRegion();
+ throwIfScanOutOfRegion(scan, region);
return doPostScannerOpen(c, scan, s);
} catch (Throwable t) {
ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index d5e3deb..44e9dfa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -100,9 +100,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
if (expressionBytes == null) {
expressionBytes = scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS);
- if (expressionBytes == null) {
- return s;
- }
keyOrdered = true;
}
List<Expression> expressions = deserializeGroupByExpressions(expressionBytes);
@@ -516,4 +513,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
};
}
+
+ @Override
+ protected boolean isRegionObserverFor(Scan scan) {
+ return scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != null ||
+ scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 2df8649..6d8a834 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -52,7 +52,6 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
-import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -170,12 +169,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable {
- byte[] isScanQuery = scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY);
-
- if (isScanQuery == null || Bytes.compareTo(PDataType.FALSE_BYTES, isScanQuery) == 0) {
- return s;
- }
-
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
@@ -411,6 +404,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
}
};
}
+
+ @Override
+ protected boolean isRegionObserverFor(Scan scan) {
+ return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index cc311ca..566ec67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -118,11 +118,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
- byte[] isUngroupedAgg = scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG);
- if (isUngroupedAgg == null) {
- return s;
- }
-
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
RegionScanner theScanner = s;
@@ -408,4 +403,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
}
+
+ @Override
+ protected boolean isRegionObserverFor(Scan scan) {
+ return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d6ff6d5..74f91e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SequenceAlreadyExistsException;
import org.apache.phoenix.schema.SequenceNotFoundException;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TypeMismatchException;
@@ -258,6 +259,12 @@ public enum SQLExceptionCode {
SPLIT_POINT_NOT_CONSTANT(1105, "XCL04", "Split points must be constants."),
BATCH_EXCEPTION(1106, "XCL05", "Exception while executing batch."),
EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH(1107, "XCL06", "An executeUpdate is prohibited when the batch is not empty. Use clearBatch to empty the batch first."),
+ STALE_REGION_BOUNDARY_CACHE(1108, "XCL07", "Cache of region boundaries are out of date.", new Factory() {
+ @Override
+ public SQLException newException(SQLExceptionInfo info) {
+ return new StaleRegionBoundaryCacheException(info.getSchemaName(), info.getTableName());
+ }
+ }),
/**
* Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 61767ee..da9a819 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -56,6 +57,7 @@ import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SQLCloseables;
@@ -66,6 +68,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
+import com.google.common.collect.Lists;
/**
@@ -228,6 +231,14 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
return ParallelIteratorRegionSplitterFactory.getSplitter(context, table, hintNode).getSplits();
}
+ private static List<KeyRange> toKeyRanges(List<HRegionLocation> regions) {
+ List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(regions.size());
+ for (HRegionLocation region : regions) {
+ keyRanges.add(TO_KEY_RANGE.apply(region));
+ }
+ return keyRanges;
+ }
+
public List<KeyRange> getSplits() {
return splits;
}
@@ -243,85 +254,118 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
ReadOnlyProps props = services.getProps();
int numSplits = splits.size();
List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
- List<Pair<byte[],Future<PeekingResultIterator>>> futures = new ArrayList<Pair<byte[],Future<PeekingResultIterator>>>(numSplits);
+ List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits);
final UUID scanId = UUID.randomUUID();
try {
- ExecutorService executor = services.getExecutor();
- for (KeyRange split : splits) {
- final Scan splitScan = new Scan(this.context.getScan());
- // Intersect with existing start/stop key if the table is salted
- // If not salted, we've already intersected it. If salted, we need
- // to wait until now to intersect, as we're running parallel scans
- // on all the possible regions here.
- if (tableRef.getTable().getBucketNum() != null) {
- KeyRange minMaxRange = context.getMinMaxRange();
- if (minMaxRange != null) {
- // Add salt byte based on current split, as minMaxRange won't have it
- minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
- split = split.intersect(minMaxRange);
- }
- }
- if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
- // Delay the swapping of start/stop row until row so we don't muck with the intersect logic
- ScanUtil.swapStartStopRowIfReversed(splitScan);
- Future<PeekingResultIterator> future =
- executor.submit(new JobCallable<PeekingResultIterator>() {
-
- @Override
- public PeekingResultIterator call() throws Exception {
- // TODO: different HTableInterfaces for each thread or the same is better?
-
- StatementContext scanContext = new StatementContext(context, splitScan);
- long startTime = System.currentTimeMillis();
- ResultIterator scanner = new TableResultIterator(scanContext, tableRef, splitScan);
- if (logger.isDebugEnabled()) {
- logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
- }
- return iteratorFactory.newIterator(scanContext, scanner);
- }
-
- /**
- * Defines the grouping for round robin behavior. All threads spawned to process
- * this scan will be grouped together and time sliced with other simultaneously
- * executing parallel scans.
- */
- @Override
- public Object getJobId() {
- return ParallelIterators.this;
- }
- });
- futures.add(new Pair<byte[],Future<PeekingResultIterator>>(split.getLowerRange(),future));
- }
- }
-
+ submitWork(scanId, splits, futures);
int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
final int factor = ScanUtil.isReversed(this.context.getScan()) ? -1 : 1;
- // Sort futures by row key so that we have a predicatble order we're getting rows back for scans.
+ // Sort futures by row key so that we have a predictable order we're getting rows back for scans.
// We're going to wait here until they're finished anyway and this makes testing much easier.
- Collections.sort(futures, new Comparator<Pair<byte[],Future<PeekingResultIterator>>>() {
+ Collections.sort(futures, new Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() {
@Override
- public int compare(Pair<byte[], Future<PeekingResultIterator>> o1, Pair<byte[], Future<PeekingResultIterator>> o2) {
- return factor * Bytes.compareTo(o1.getFirst(), o2.getFirst());
+ public int compare(Pair<KeyRange, Future<PeekingResultIterator>> o1, Pair<KeyRange, Future<PeekingResultIterator>> o2) {
+ return factor * Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange());
}
});
- for (Pair<byte[],Future<PeekingResultIterator>> future : futures) {
- iterators.add(future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS));
+ boolean clearedCache = false;
+ byte[] tableName = tableRef.getTable().getName().getBytes();
+ for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
+ try {
+ PeekingResultIterator iterator = future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ iterators.add(iterator);
+ } catch (ExecutionException e) {
+ try { // Rethrow as SQLException
+ throw ServerUtil.parseServerException(e);
+ } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+ List<Pair<KeyRange,Future<PeekingResultIterator>>> newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2);
+ if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
+ services.clearTableRegionCache(tableName);
+ clearedCache = true;
+ }
+ List<KeyRange> allSplits = toKeyRanges(services.getAllTableRegions(tableName));
+ // Intersect what was the expected boundary with all new region boundaries and
+ // resubmit just this portion of work again
+ List<KeyRange> newSubSplits = KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits);
+ submitWork(scanId, newSubSplits, newFutures);
+ for (Pair<KeyRange,Future<PeekingResultIterator>> newFuture : newFutures) {
+ // Immediate do a get (not catching exception again) and then add the iterators we
+ // get back immediately. They'll be sorted as expected, since they're replacing the
+ // original one.
+ PeekingResultIterator iterator = newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ iterators.add(iterator);
+ }
+ }
+ }
}
success = true;
return iterators;
+ } catch (SQLException e) {
+ throw e;
} catch (Exception e) {
throw ServerUtil.parseServerException(e);
} finally {
if (!success) {
SQLCloseables.closeAllQuietly(iterators);
// Don't call cancel, as it causes the HConnection to get into a funk
-// for (Pair<byte[],Future<PeekingResultIterator>> future : futures) {
+// for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
// future.getSecond().cancel(true);
// }
}
}
}
+
+ private void submitWork(final UUID scanId, List<KeyRange> splits,
+ List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) {
+ final ConnectionQueryServices services = context.getConnection().getQueryServices();
+ ExecutorService executor = services.getExecutor();
+ for (KeyRange split : splits) {
+ final Scan splitScan = ScanUtil.newScan(context.getScan());
+ // Intersect with existing start/stop key if the table is salted
+ // If not salted, we've already intersected it. If salted, we need
+ // to wait until now to intersect, as we're running parallel scans
+ // on all the possible regions here.
+ if (tableRef.getTable().getBucketNum() != null) {
+ KeyRange minMaxRange = context.getMinMaxRange();
+ if (minMaxRange != null) {
+ // Add salt byte based on current split, as minMaxRange won't have it
+ minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
+ split = split.intersect(minMaxRange);
+ }
+ }
+ if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
+ // Delay the swapping of start/stop row until row so we don't muck with the intersect logic
+ ScanUtil.swapStartStopRowIfReversed(splitScan);
+ Future<PeekingResultIterator> future =
+ executor.submit(new JobCallable<PeekingResultIterator>() {
+
+ @Override
+ public PeekingResultIterator call() throws Exception {
+ StatementContext scanContext = new StatementContext(context, splitScan);
+ long startTime = System.currentTimeMillis();
+ ResultIterator scanner = new TableResultIterator(scanContext, tableRef, splitScan);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
+ }
+ return iteratorFactory.newIterator(scanContext, scanner);
+ }
+
+ /**
+ * Defines the grouping for round robin behavior. All threads spawned to process
+ * this scan will be grouped together and time sliced with other simultaneously
+ * executing parallel scans.
+ */
+ @Override
+ public Object getJobId() {
+ return ParallelIterators.this;
+ }
+ });
+ futures.add(new Pair<KeyRange,Future<PeekingResultIterator>>(split,future));
+ }
+ }
+
+ }
@Override
public int size() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
new file mode 100644
index 0000000..eb9d875
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class StaleRegionBoundaryCacheException extends SQLException {
+ private static final long serialVersionUID = 1L;
+ private static SQLExceptionCode ERROR_CODE = SQLExceptionCode.STALE_REGION_BOUNDARY_CACHE;
+
+ public StaleRegionBoundaryCacheException() {
+ this(null, null);
+ }
+
+ public StaleRegionBoundaryCacheException(byte[] fullTableName) {
+ this(SchemaUtil.getSchemaNameFromFullName(fullTableName),SchemaUtil.getTableNameFromFullName(fullTableName));
+ }
+
+ public StaleRegionBoundaryCacheException(String fullTableName) {
+ this(SchemaUtil.getSchemaNameFromFullName(fullTableName),SchemaUtil.getTableNameFromFullName(fullTableName));
+ }
+
+ public StaleRegionBoundaryCacheException(String schemaName, String tableName) {
+ super(new SQLExceptionInfo.Builder(ERROR_CODE).setSchemaName(schemaName).setTableName(tableName).build().toString(),
+ ERROR_CODE.getSQLState(), ERROR_CODE.getErrorCode(), null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 09f96e3..21dab0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -484,11 +484,42 @@ public class SchemaUtil {
public static String getSchemaNameFromFullName(String tableName) {
int index = tableName.indexOf(QueryConstants.NAME_SEPARATOR);
if (index < 0) {
- return "";
+ return StringUtil.EMPTY_STRING;
}
return tableName.substring(0, index);
}
+ private static int indexOf (byte[] bytes, byte b) {
+ for (int i = 0; i < bytes.length; i++) {
+ if (bytes[i] == b) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public static String getSchemaNameFromFullName(byte[] tableName) {
+ if (tableName == null) {
+ return null;
+ }
+ int index = indexOf(tableName, QueryConstants.NAME_SEPARATOR_BYTE);
+ if (index < 0) {
+ return StringUtil.EMPTY_STRING;
+ }
+ return Bytes.toString(tableName, 0, index);
+ }
+
+ public static String getTableNameFromFullName(byte[] tableName) {
+ if (tableName == null) {
+ return null;
+ }
+ int index = indexOf(tableName, QueryConstants.NAME_SEPARATOR_BYTE);
+ if (index < 0) {
+ return Bytes.toString(tableName);
+ }
+ return Bytes.toString(tableName, index+1, tableName.length);
+ }
+
public static String getTableNameFromFullName(String tableName) {
int index = tableName.indexOf(QueryConstants.NAME_SEPARATOR);
if (index < 0) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/99791940/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 58a3478..45a6f14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -25,9 +25,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
public class ServerUtil {
@@ -110,9 +110,14 @@ public class ServerUtil {
// If the message matches the standard pattern, recover the SQLException and throw it.
Matcher matcher = PATTERN.matcher(t.getLocalizedMessage());
if (matcher.find()) {
- int errorCode = Integer.parseInt(matcher.group(1));
- String sqlState = matcher.group(2);
- return new SQLException(matcher.group(), sqlState, errorCode, t);
+ int statusCode = Integer.parseInt(matcher.group(1));
+ SQLExceptionCode code;
+ try {
+ code = SQLExceptionCode.fromErrorCode(statusCode);
+ } catch (SQLException e) {
+ return e;
+ }
+ return new SQLExceptionInfo.Builder(code).setMessage(matcher.group()).build().buildException();
}
}
return null;