You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/21 07:09:28 UTC

[2/2] phoenix git commit: PHOENIX-1463 phoenix.query.timeoutMs doesn't work as expected (Samarth Jain)

PHOENIX-1463 phoenix.query.timeoutMs doesn't work as expected (Samarth Jain)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a5867237
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a5867237
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a5867237

Branch: refs/heads/4.0
Commit: a5867237706f1388086c0e4a986cc990ff838023
Parents: 3434655
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Nov 20 20:57:22 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Nov 20 22:06:51 2014 -0800

----------------------------------------------------------------------
 .../phoenix/exception/SQLExceptionCode.java     |  1 +
 .../phoenix/iterate/BaseResultIterators.java    | 53 ++++++++++++++------
 2 files changed, 38 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5867237/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 5c6018d..b776007 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -290,6 +290,7 @@ public enum SQLExceptionCode {
     OUTDATED_JARS(2007, "INT09", "Outdated jars."),
     INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index metadata. "),
     UNKNOWN_ERROR_CODE(2009, "INT11", "Unknown error code"),
+    OPERATION_TIMED_OUT(6000, "TIM01", "Operation timed out")
     ;
 
     private final int errorCode;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a5867237/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index c873494..446a182 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -34,6 +34,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -46,6 +47,8 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -513,20 +516,26 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans);
         allFutures.add(futures);
         SQLException toThrow = null;
+        int queryTimeOut = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
+        long maxQueryEndTime = System.currentTimeMillis() + queryTimeOut;
         try {
             submitWork(scans, futures, allIterators, splits.size());
-            int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
             boolean clearedCache = false;
             for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
                 List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
                 for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
                     try {
-                        PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                        long timeOutForScan = maxQueryEndTime - System.currentTimeMillis();
+                        if (timeOutForScan < 0) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException(); 
+                        }
+                        PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
                         concatIterators.add(iterator);
                     } catch (ExecutionException e) {
                         try { // Rethrow as SQLException
                             throw ServerUtil.parseServerException(e);
-                        } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+                        } catch (StaleRegionBoundaryCacheException e2) { 
+                            // Catch only to try to recover from region boundary cache being out of date
                             List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
                             if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
                                 services.clearTableRegionCache(physicalTableName);
@@ -551,7 +560,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                                     // Immediate do a get (not catching exception again) and then add the iterators we
                                     // get back immediately. They'll be sorted as expected, since they're replacing the
                                     // original one.
-                                    PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                                    long timeOutForScan = maxQueryEndTime - System.currentTimeMillis();
+                                    if (timeOutForScan < 0) {
+                                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException(); 
+                                    }
+                                    PeekingResultIterator iterator = newScanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
                                     iterators.add(iterator);
                                 }
                             }
@@ -563,6 +576,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
 
             success = true;
             return iterators;
+        } catch (TimeoutException e) {
+            // thrown when a thread times out waiting for the future.get() call to return
+            toThrow = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT)
+                    .setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms")
+                    .setRootCause(e).build().buildException();
         } catch (SQLException e) {
             toThrow = e;
         } catch (Exception e) {
@@ -605,22 +623,25 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         // Don't call cancel on already started work, as it causes the HConnection
         // to get into a funk. Instead, just cancel queued work.
         boolean cancelledWork = false;
-        for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
-            for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
-                for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
-                    // When work is rejected, we may have null futurePair entries, because
-                    // we randomize these and set them as they're submitted.
-                    if (futurePair != null) {
-                        Future<PeekingResultIterator> future = futurePair.getSecond();
-                        if (future != null) {
-                            cancelledWork |= future.cancel(false);
+        try {
+            for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
+                for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
+                    for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
+                        // When work is rejected, we may have null futurePair entries, because
+                        // we randomize these and set them as they're submitted.
+                        if (futurePair != null) {
+                            Future<PeekingResultIterator> future = futurePair.getSecond();
+                            if (future != null) {
+                                cancelledWork |= future.cancel(false);
+                            }
                         }
                     }
                 }
             }
-        }
-        if (cancelledWork) {
-            context.getConnection().getQueryServices().getExecutor().purge();
+        } finally {
+            if (cancelledWork) {
+                context.getConnection().getQueryServices().getExecutor().purge();
+            }
         }
     }