You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/21 07:09:28 UTC
[2/2] phoenix git commit: PHOENIX-1463 phoenix.query.timeoutMs
doesn't work as expected (Samarth Jain)
PHOENIX-1463 phoenix.query.timeoutMs doesn't work as expected (Samarth Jain)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a5867237
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a5867237
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a5867237
Branch: refs/heads/4.0
Commit: a5867237706f1388086c0e4a986cc990ff838023
Parents: 3434655
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Nov 20 20:57:22 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Nov 20 22:06:51 2014 -0800
----------------------------------------------------------------------
.../phoenix/exception/SQLExceptionCode.java | 1 +
.../phoenix/iterate/BaseResultIterators.java | 53 ++++++++++++++------
2 files changed, 38 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5867237/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 5c6018d..b776007 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -290,6 +290,7 @@ public enum SQLExceptionCode {
OUTDATED_JARS(2007, "INT09", "Outdated jars."),
INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index metadata. "),
UNKNOWN_ERROR_CODE(2009, "INT11", "Unknown error code"),
+ OPERATION_TIMED_OUT(6000, "TIM01", "Operation timed out")
;
private final int errorCode;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5867237/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index c873494..446a182 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -34,6 +34,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -46,6 +47,8 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.filter.ColumnProjectionFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.parse.FilterableStatement;
@@ -513,20 +516,26 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans);
allFutures.add(futures);
SQLException toThrow = null;
+ int queryTimeOut = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
+ long maxQueryEndTime = System.currentTimeMillis() + queryTimeOut;
try {
submitWork(scans, futures, allIterators, splits.size());
- int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
boolean clearedCache = false;
for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
try {
- PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ long timeOutForScan = maxQueryEndTime - System.currentTimeMillis();
+ if (timeOutForScan < 0) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException();
+ }
+ PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
concatIterators.add(iterator);
} catch (ExecutionException e) {
try { // Rethrow as SQLException
throw ServerUtil.parseServerException(e);
- } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+ } catch (StaleRegionBoundaryCacheException e2) {
+ // Catch only to try to recover from region boundary cache being out of date
List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
services.clearTableRegionCache(physicalTableName);
@@ -551,7 +560,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// Immediate do a get (not catching exception again) and then add the iterators we
// get back immediately. They'll be sorted as expected, since they're replacing the
// original one.
- PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ long timeOutForScan = maxQueryEndTime - System.currentTimeMillis();
+ if (timeOutForScan < 0) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException();
+ }
+ PeekingResultIterator iterator = newScanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
iterators.add(iterator);
}
}
@@ -563,6 +576,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
success = true;
return iterators;
+ } catch (TimeoutException e) {
+ // thrown when a thread times out waiting for the future.get() call to return
+ toThrow = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT)
+ .setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms")
+ .setRootCause(e).build().buildException();
} catch (SQLException e) {
toThrow = e;
} catch (Exception e) {
@@ -605,22 +623,25 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// Don't call cancel on already started work, as it causes the HConnection
// to get into a funk. Instead, just cancel queued work.
boolean cancelledWork = false;
- for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
- for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
- for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
- // When work is rejected, we may have null futurePair entries, because
- // we randomize these and set them as they're submitted.
- if (futurePair != null) {
- Future<PeekingResultIterator> future = futurePair.getSecond();
- if (future != null) {
- cancelledWork |= future.cancel(false);
+ try {
+ for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
+ for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
+ for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
+ // When work is rejected, we may have null futurePair entries, because
+ // we randomize these and set them as they're submitted.
+ if (futurePair != null) {
+ Future<PeekingResultIterator> future = futurePair.getSecond();
+ if (future != null) {
+ cancelledWork |= future.cancel(false);
+ }
}
}
}
}
- }
- if (cancelledWork) {
- context.getConnection().getQueryServices().getExecutor().purge();
+ } finally {
+ if (cancelledWork) {
+ context.getConnection().getQueryServices().getExecutor().purge();
+ }
}
}