You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/13 03:59:36 UTC
phoenix git commit: PHOENIX-1448 Fix resource leak when work rejected
by thread executor
Repository: phoenix
Updated Branches:
refs/heads/4.0 21688a886 -> 8356b5422
PHOENIX-1448 Fix resource leak when work rejected by thread executor
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8356b542
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8356b542
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8356b542
Branch: refs/heads/4.0
Commit: 8356b5422f5395f26038161ef9dd6b1315394b00
Parents: 21688a8
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 12 18:59:25 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 12 18:59:25 2014 -0800
----------------------------------------------------------------------
.../phoenix/iterate/BaseResultIterators.java | 22 +++++++++++++-------
.../phoenix/iterate/ParallelIterators.java | 6 ++++--
.../apache/phoenix/iterate/SerialIterators.java | 6 ++++--
3 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8356b542/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 7785c54..c873494 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -504,13 +504,17 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
final ConnectionQueryServices services = context.getConnection().getQueryServices();
ReadOnlyProps props = services.getProps();
- int numSplits = size();
- List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
- final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+ int numScans = size();
+ // Capture all iterators so that if something goes wrong, we close them all
+ // The iterators list is based on the submission of work, so it may not
+ // contain them all (for example if work was rejected from the queue)
+ List<PeekingResultIterator> allIterators = Lists.newArrayListWithExpectedSize(this.splits.size());
+ List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans);
+ final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans);
allFutures.add(futures);
SQLException toThrow = null;
try {
- submitWork(scans, futures, splits.size());
+ submitWork(scans, futures, allIterators, splits.size());
int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
boolean clearedCache = false;
for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
@@ -540,7 +544,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// as we need these to be in order
addIterator(iterators, concatIterators);
concatIterators = Collections.emptyList();
- submitWork(newNestedScans, newFutures, newNestedScans.size());
+ submitWork(newNestedScans, newFutures, allIterators, newNestedScans.size());
allFutures.add(newFutures);
for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
@@ -576,7 +580,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
} finally {
try {
- SQLCloseables.closeAll(iterators);
+ SQLCloseables.closeAll(allIterators);
} catch (Exception e) {
if (toThrow == null) {
toThrow = ServerUtil.parseServerException(e);
@@ -604,7 +608,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
- if (futurePair != null) { // FIXME: null check should not be necessary
+ // When work is rejected, we may have null futurePair entries, because
+ // we randomize these and set them as they're submitted.
+ if (futurePair != null) {
Future<PeekingResultIterator> future = futurePair.getSecond();
if (future != null) {
cancelledWork |= future.cancel(false);
@@ -648,7 +654,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
abstract protected String getName();
abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- int estFlattenedSize);
+ List<PeekingResultIterator> allIterators, int estFlattenedSize);
@Override
public int size() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8356b542/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index d16160c..62af19a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -57,7 +57,7 @@ public class ParallelIterators extends BaseResultIterators {
@Override
protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- int estFlattenedSize) {
+ final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
// Pre-populate nestedFutures lists so that we can shuffle the scans
// and add the future to the right nested list. By shuffling the scans
// we get better utilization of the cluster since our thread executor
@@ -89,7 +89,9 @@ public class ParallelIterators extends BaseResultIterators {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
}
- return iteratorFactory.newIterator(context, scanner, scan);
+ PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan);
+ allIterators.add(iterator);
+ return iterator;
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8356b542/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 502cdf8..4be7b56 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -61,7 +61,7 @@ public class SerialIterators extends BaseResultIterators {
@Override
protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- int estFlattenedSize) {
+ final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
// Pre-populate nestedFutures lists so that we can shuffle the scans
// and add the future to the right nested list. By shuffling the scans
// we get better utilization of the cluster since our thread executor
@@ -88,7 +88,9 @@ public class SerialIterators extends BaseResultIterators {
concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
}
PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
- return new LimitingPeekingResultIterator(concatIterator, limit);
+ PeekingResultIterator iterator = new LimitingPeekingResultIterator(concatIterator, limit);
+ allIterators.add(iterator);
+ return iterator;
}
/**