You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/13 06:55:29 UTC
[4/9] phoenix git commit: PHOENIX-1429 Cancel queued threads when
limit reached
PHOENIX-1429 Cancel queued threads when limit reached
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9ee5d01a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9ee5d01a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9ee5d01a
Branch: refs/heads/master
Commit: 9ee5d01ab17fe0f83b55e6991d1f954ced5f3f55
Parents: 1c1f18f
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 12 11:37:25 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 12 21:50:30 2014 -0800
----------------------------------------------------------------------
.../phoenix/iterate/BaseResultIterators.java | 65 ++++++++++++++++----
.../phoenix/iterate/ConcatResultIterator.java | 38 ++++++++++--
.../phoenix/iterate/LimitingResultIterator.java | 1 +
.../iterate/MergeSortResultIterator.java | 27 +++++++-
.../apache/phoenix/iterate/ResultIterators.java | 3 +-
.../phoenix/query/BaseQueryServicesImpl.java | 6 +-
.../phoenix/query/DelegateQueryServices.java | 4 +-
.../org/apache/phoenix/query/QueryServices.java | 4 +-
.../iterate/AggregateResultScannerTest.java | 4 ++
.../iterate/ConcatResultIteratorTest.java | 15 ++++-
.../iterate/MergeSortResultIteratorTest.java | 11 +++-
11 files changed, 147 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 519c162..ade83db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -97,6 +97,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private final PTableStats tableStats;
private final byte[] physicalTableName;
private final QueryPlan plan;
+ // TODO: too much nesting here - breakup into new classes.
+ private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
+
static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
@Override
@@ -125,8 +128,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return true;
}
- public BaseResultIterators(QueryPlan plan, Integer perScanLimit)
- throws SQLException {
+ public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws SQLException {
super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint());
this.plan = plan;
StatementContext context = plan.getContext();
@@ -181,6 +183,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
}
this.splits = ImmutableList.copyOf(splitRanges);
+ // If split detected, this will be more than one, but that's unlikely
+ this.allFutures = Lists.newArrayListWithExpectedSize(1);
}
private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
@@ -499,6 +503,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
int numSplits = size();
List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+ allFutures.add(futures);
+ SQLException toThrow = null;
// TODO: what purpose does this scanID serve?
final UUID scanId = UUID.randomUUID();
try {
@@ -533,6 +539,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
addIterator(iterators, concatIterators);
concatIterators = Collections.emptyList();
submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
+ allFutures.add(newFutures);
for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
// Immediate do a get (not catching exception again) and then add the iterators we
@@ -551,23 +558,59 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
success = true;
return iterators;
} catch (SQLException e) {
- throw e;
+ toThrow = e;
} catch (Exception e) {
- throw ServerUtil.parseServerException(e);
+ toThrow = ServerUtil.parseServerException(e);
} finally {
- if (!success) {
- SQLCloseables.closeAllQuietly(iterators);
- // Don't call cancel on already started work, as it causes the HConnection
- // to get into a funk. Instead, just cancel queued work.
- for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
- for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
- futurePair.getSecond().cancel(false);
+ try {
+ if (!success) {
+ try {
+ close();
+ } catch (Exception e) {
+ if (toThrow == null) {
+ toThrow = ServerUtil.parseServerException(e);
+ } else {
+ toThrow.setNextException(ServerUtil.parseServerException(e));
+ }
+ } finally {
+ try {
+ SQLCloseables.closeAll(iterators);
+ } catch (Exception e) {
+ if (toThrow == null) {
+ toThrow = ServerUtil.parseServerException(e);
+ } else {
+ toThrow.setNextException(ServerUtil.parseServerException(e));
+ }
+ }
}
}
+ } finally {
+ if (toThrow != null) {
+ throw toThrow;
+ }
}
}
+ return null; // Not reachable
}
+
+ @Override
+ public void close() throws SQLException {
+ // Don't call cancel on already started work, as it causes the HConnection
+ // to get into a funk. Instead, just cancel queued work.
+ boolean cancelledWork = false;
+ for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
+ for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
+ for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
+ cancelledWork |= futurePair.getSecond().cancel(false);
+ }
+ }
+ }
+ if (cancelledWork) {
+ context.getConnection().getQueryServices().getExecutor().purge();
+ }
+ }
+
private void addIterator(List<PeekingResultIterator> parentIterators, List<PeekingResultIterator> childIterators) {
if (!childIterators.isEmpty()) {
parentIterators.add(ConcatResultIterator.newIterator(childIterators));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
index 03f8785..fcc88aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.List;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
/**
@@ -53,10 +54,33 @@ public class ConcatResultIterator implements PeekingResultIterator {
@Override
public void close() throws SQLException {
- if (iterators != null) {
- for (;index < iterators.size(); index++) {
- PeekingResultIterator iterator = iterators.get(index);
- iterator.close();
+ SQLException toThrow = null;
+ try {
+ if (resultIterators != null) {
+ resultIterators.close();
+ }
+ } catch (Exception e) {
+ toThrow = ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ if (iterators != null) {
+ for (;index < iterators.size(); index++) {
+ PeekingResultIterator iterator = iterators.get(index);
+ try {
+ iterator.close();
+ } catch (Exception e) {
+ if (toThrow == null) {
+ toThrow = ServerUtil.parseServerException(e);
+ } else {
+ toThrow.setNextException(ServerUtil.parseServerException(e));
+ }
+ }
+ }
+ }
+ } finally {
+ if (toThrow != null) {
+ throw toThrow;
+ }
}
}
}
@@ -90,7 +114,11 @@ public class ConcatResultIterator implements PeekingResultIterator {
@Override
public Tuple next() throws SQLException {
- return currentIterator().next();
+ Tuple next = currentIterator().next();
+ if (next == null) {
+ close(); // Close underlying ResultIterators to free resources sooner rather than later
+ }
+ return next;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
index 2d0ff2c..c789093 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
@@ -41,6 +41,7 @@ public class LimitingResultIterator extends DelegateResultIterator {
@Override
public Tuple next() throws SQLException {
if (rowCount++ >= limit) {
+ close(); // Free resources early
return null;
}
return super.next();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
index 2f5d941..9ef3e70 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
@@ -21,9 +21,9 @@ import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ServerUtil;
/**
@@ -52,8 +52,29 @@ public abstract class MergeSortResultIterator implements PeekingResultIterator {
@Override
public void close() throws SQLException {
- if (iterators != null) {
- SQLCloseables.closeAll(iterators);
+ SQLException toThrow = null;
+ try {
+ if (resultIterators != null) {
+ resultIterators.close();
+ }
+ } catch (Exception e) {
+ toThrow = ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ if (iterators != null) {
+ SQLCloseables.closeAll(iterators);
+ }
+ } catch (Exception e) {
+ if (toThrow == null) {
+ toThrow = ServerUtil.parseServerException(e);
+ } else {
+ toThrow.setNextException(ServerUtil.parseServerException(e));
+ }
+ } finally {
+ if (toThrow != null) {
+ throw toThrow;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
index ef2b534..16f8b41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
@@ -22,8 +22,9 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.util.SQLCloseable;
-public interface ResultIterators {
+public interface ResultIterators extends SQLCloseable {
public int size();
public List<KeyRange> getSplits();
public List<List<Scan>> getScans();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index f116695..73cc3c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -17,7 +17,7 @@
*/
package org.apache.phoenix.query;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.phoenix.job.JobManager;
import org.apache.phoenix.memory.GlobalMemoryManager;
@@ -35,7 +35,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
* @since 0.1
*/
public abstract class BaseQueryServicesImpl implements QueryServices {
- private final ExecutorService executor;
+ private final ThreadPoolExecutor executor;
private final MemoryManager memoryManager;
private final ReadOnlyProps props;
private final QueryOptimizer queryOptimizer;
@@ -53,7 +53,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices {
}
@Override
- public ExecutorService getExecutor() {
+ public ThreadPoolExecutor getExecutor() {
return executor;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
index 9d9a513..e58aa5f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
@@ -18,7 +18,7 @@
package org.apache.phoenix.query;
import java.sql.SQLException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.optimize.QueryOptimizer;
@@ -46,7 +46,7 @@ public class DelegateQueryServices implements QueryServices {
}
@Override
- public ExecutorService getExecutor() {
+ public ThreadPoolExecutor getExecutor() {
return parent.getExecutor();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index b074fb4..ab1a8e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -17,7 +17,7 @@
*/
package org.apache.phoenix.query;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.http.annotation.Immutable;
import org.apache.phoenix.iterate.SpoolTooBigToDiskException;
@@ -145,7 +145,7 @@ public interface QueryServices extends SQLCloseable {
/**
* Get executor service used for parallel scans
*/
- public ExecutorService getExecutor();
+ public ThreadPoolExecutor getExecutor();
/**
* Get the memory manager used to track memory usage
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index e2f6fdb..3197676 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -154,6 +154,10 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest {
public List<List<Scan>> getScans() {
return Collections.emptyList();
}
+
+ @Override
+ public void close() throws SQLException {
+ }
};
ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
index 02fdcea..cf71724 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
@@ -21,16 +21,19 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.AssertResults;
+import org.junit.Test;
@@ -78,6 +81,10 @@ public class ConcatResultIteratorTest {
public List<List<Scan>> getScans() {
return Collections.emptyList();
}
+
+ @Override
+ public void close() throws SQLException {
+ }
};
Tuple[] expectedResults = new Tuple[] {
@@ -137,6 +144,10 @@ public class ConcatResultIteratorTest {
public List<List<Scan>> getScans() {
return Collections.emptyList();
}
+
+ @Override
+ public void close() throws SQLException {
+ }
};
ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
AssertResults.assertResults(scanner, expectedResults);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
index 095027c..77e42b0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
@@ -21,16 +21,19 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.AssertResults;
+import org.junit.Test;
public class MergeSortResultIteratorTest {
@@ -83,6 +86,10 @@ public class MergeSortResultIteratorTest {
public List<List<Scan>> getScans() {
return Collections.emptyList();
}
+
+ @Override
+ public void close() throws SQLException {
+ }
};
ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
AssertResults.assertResults(scanner, expectedResults);