You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/13 03:59:36 UTC

phoenix git commit: PHOENIX-1448 Fix resource leak when work rejected by thread executor

Repository: phoenix
Updated Branches:
  refs/heads/4.0 21688a886 -> 8356b5422


PHOENIX-1448 Fix resource leak when work rejected by thread executor


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8356b542
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8356b542
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8356b542

Branch: refs/heads/4.0
Commit: 8356b5422f5395f26038161ef9dd6b1315394b00
Parents: 21688a8
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 12 18:59:25 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 12 18:59:25 2014 -0800

----------------------------------------------------------------------
 .../phoenix/iterate/BaseResultIterators.java    | 22 +++++++++++++-------
 .../phoenix/iterate/ParallelIterators.java      |  6 ++++--
 .../apache/phoenix/iterate/SerialIterators.java |  6 ++++--
 3 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8356b542/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 7785c54..c873494 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -504,13 +504,17 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
         final ConnectionQueryServices services = context.getConnection().getQueryServices();
         ReadOnlyProps props = services.getProps();
-        int numSplits = size();
-        List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
-        final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+        int numScans = size();
+        // Capture all iterators so that if something goes wrong, we close them all
+        // The iterators list is based on the submission of work, so it may not
+        // contain them all (for example if work was rejected from the queue)
+        List<PeekingResultIterator> allIterators = Lists.newArrayListWithExpectedSize(this.splits.size());
+        List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans);
+        final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans);
         allFutures.add(futures);
         SQLException toThrow = null;
         try {
-            submitWork(scans, futures, splits.size());
+            submitWork(scans, futures, allIterators, splits.size());
             int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
             boolean clearedCache = false;
             for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
@@ -540,7 +544,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                             // as we need these to be in order
                             addIterator(iterators, concatIterators);
                             concatIterators = Collections.emptyList();
-                            submitWork(newNestedScans, newFutures, newNestedScans.size());
+                            submitWork(newNestedScans, newFutures, allIterators, newNestedScans.size());
                             allFutures.add(newFutures);
                             for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
                                 for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
@@ -576,7 +580,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                         }
                     } finally {
                         try {
-                            SQLCloseables.closeAll(iterators);
+                            SQLCloseables.closeAll(allIterators);
                         } catch (Exception e) {
                             if (toThrow == null) {
                                 toThrow = ServerUtil.parseServerException(e);
@@ -604,7 +608,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
             for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
                 for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
-                    if (futurePair != null) { // FIXME: null check should not be necessary
+                    // When work is rejected, we may have null futurePair entries, because
+                    // we randomize these and set them as they're submitted.
+                    if (futurePair != null) {
                         Future<PeekingResultIterator> future = futurePair.getSecond();
                         if (future != null) {
                             cancelledWork |= future.cancel(false);
@@ -648,7 +654,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            int estFlattenedSize);
+            List<PeekingResultIterator> allIterators, int estFlattenedSize);
 
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8356b542/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index d16160c..62af19a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -57,7 +57,7 @@ public class ParallelIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            int estFlattenedSize) {
+            final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -89,7 +89,9 @@ public class ParallelIterators extends BaseResultIterators {
                     if (logger.isDebugEnabled()) {
                         logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
                     }
-                    return iteratorFactory.newIterator(context, scanner, scan);
+                    PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan);
+                    allIterators.add(iterator);
+                    return iterator;
                 }
 
                 /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8356b542/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 502cdf8..4be7b56 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -61,7 +61,7 @@ public class SerialIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            int estFlattenedSize) {
+            final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -88,7 +88,9 @@ public class SerialIterators extends BaseResultIterators {
 	                    concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
                 	}
                 	PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
-                	return new LimitingPeekingResultIterator(concatIterator, limit);
+                	PeekingResultIterator iterator = new LimitingPeekingResultIterator(concatIterator, limit);
+                    allIterators.add(iterator);
+                    return iterator;
                 }
 
                 /**