You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/13 06:55:29 UTC

[4/9] phoenix git commit: PHOENIX-1429 Cancel queued threads when limit reached

PHOENIX-1429 Cancel queued threads when limit reached


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9ee5d01a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9ee5d01a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9ee5d01a

Branch: refs/heads/master
Commit: 9ee5d01ab17fe0f83b55e6991d1f954ced5f3f55
Parents: 1c1f18f
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 12 11:37:25 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 12 21:50:30 2014 -0800

----------------------------------------------------------------------
 .../phoenix/iterate/BaseResultIterators.java    | 65 ++++++++++++++++----
 .../phoenix/iterate/ConcatResultIterator.java   | 38 ++++++++++--
 .../phoenix/iterate/LimitingResultIterator.java |  1 +
 .../iterate/MergeSortResultIterator.java        | 27 +++++++-
 .../apache/phoenix/iterate/ResultIterators.java |  3 +-
 .../phoenix/query/BaseQueryServicesImpl.java    |  6 +-
 .../phoenix/query/DelegateQueryServices.java    |  4 +-
 .../org/apache/phoenix/query/QueryServices.java |  4 +-
 .../iterate/AggregateResultScannerTest.java     |  4 ++
 .../iterate/ConcatResultIteratorTest.java       | 15 ++++-
 .../iterate/MergeSortResultIteratorTest.java    | 11 +++-
 11 files changed, 147 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 519c162..ade83db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -97,6 +97,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     private final PTableStats tableStats;
     private final byte[] physicalTableName;
     private final QueryPlan plan;
+    // TODO: too much nesting here - breakup into new classes.
+    private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
+
     
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
         @Override
@@ -125,8 +128,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return true;
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit)
-            throws SQLException {
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint());
         this.plan = plan;
         StatementContext context = plan.getContext();
@@ -181,6 +183,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             }
         }
         this.splits = ImmutableList.copyOf(splitRanges);
+        // If split detected, this will be more than one, but that's unlikely
+        this.allFutures = Lists.newArrayListWithExpectedSize(1);
     }
 
     private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
@@ -499,6 +503,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         int numSplits = size();
         List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
         final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+        allFutures.add(futures);
+        SQLException toThrow = null;
         // TODO: what purpose does this scanID serve?
         final UUID scanId = UUID.randomUUID();
         try {
@@ -533,6 +539,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                             addIterator(iterators, concatIterators);
                             concatIterators = Collections.emptyList();
                             submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
+                            allFutures.add(newFutures);
                             for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
                                 for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
                                     // Immediate do a get (not catching exception again) and then add the iterators we
@@ -551,23 +558,59 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             success = true;
             return iterators;
         } catch (SQLException e) {
-            throw e;
+            toThrow = e;
         } catch (Exception e) {
-            throw ServerUtil.parseServerException(e);
+            toThrow = ServerUtil.parseServerException(e);
         } finally {
-            if (!success) {
-                SQLCloseables.closeAllQuietly(iterators);
-                // Don't call cancel on already started work, as it causes the HConnection
-                // to get into a funk. Instead, just cancel queued work.
-                for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
-                    for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
-                        futurePair.getSecond().cancel(false);
+            try {
+                if (!success) {
+                    try {
+                        close();
+                    } catch (Exception e) {
+                        if (toThrow == null) {
+                            toThrow = ServerUtil.parseServerException(e);
+                        } else {
+                            toThrow.setNextException(ServerUtil.parseServerException(e));
+                        }
+                    } finally {
+                        try {
+                            SQLCloseables.closeAll(iterators);
+                        } catch (Exception e) {
+                            if (toThrow == null) {
+                                toThrow = ServerUtil.parseServerException(e);
+                            } else {
+                                toThrow.setNextException(ServerUtil.parseServerException(e));
+                            }
+                        }
                     }
                 }
+            } finally {
+                if (toThrow != null) {
+                    throw toThrow;
+                }
             }
         }
+        return null; // Not reachable
     }
     
+
+    @Override
+    public void close() throws SQLException {
+        // Don't call cancel on already started work, as it causes the HConnection
+        // to get into a funk. Instead, just cancel queued work.
+        boolean cancelledWork = false;
+        for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
+            for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
+                for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
+                    cancelledWork |= futurePair.getSecond().cancel(false);
+                }
+            }
+        }
+        if (cancelledWork) {
+            context.getConnection().getQueryServices().getExecutor().purge();
+        }
+    }
+
     private void addIterator(List<PeekingResultIterator> parentIterators, List<PeekingResultIterator> childIterators) {
         if (!childIterators.isEmpty()) {
             parentIterators.add(ConcatResultIterator.newIterator(childIterators));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
index 03f8785..fcc88aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
 
 
 /**
@@ -53,10 +54,33 @@ public class ConcatResultIterator implements PeekingResultIterator {
     
     @Override
     public void close() throws SQLException {
-        if (iterators != null) {
-            for (;index < iterators.size(); index++) {
-                PeekingResultIterator iterator = iterators.get(index);
-                iterator.close();
+        SQLException toThrow = null;
+        try {
+            if (resultIterators != null) {
+                resultIterators.close();
+            }
+        } catch (Exception e) {
+           toThrow = ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (iterators != null) {
+                    for (;index < iterators.size(); index++) {
+                        PeekingResultIterator iterator = iterators.get(index);
+                        try {
+                            iterator.close();
+                        } catch (Exception e) {
+                            if (toThrow == null) {
+                                toThrow = ServerUtil.parseServerException(e);
+                            } else {
+                                toThrow.setNextException(ServerUtil.parseServerException(e));
+                            }
+                        }
+                    }
+                }
+            } finally {
+                if (toThrow != null) {
+                    throw toThrow;
+                }
             }
         }
     }
@@ -90,7 +114,11 @@ public class ConcatResultIterator implements PeekingResultIterator {
 
     @Override
     public Tuple next() throws SQLException {
-        return currentIterator().next();
+        Tuple next = currentIterator().next();
+        if (next == null) {
+            close(); // Close underlying ResultIterators to free resources sooner rather than later
+        }
+        return next;
     }
 
 	@Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
index 2d0ff2c..c789093 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java
@@ -41,6 +41,7 @@ public class LimitingResultIterator extends DelegateResultIterator {
     @Override
     public Tuple next() throws SQLException {
         if (rowCount++ >= limit) {
+            close(); // Free resources early
             return null;
         }
         return super.next();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
index 2f5d941..9ef3e70 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
@@ -21,9 +21,9 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ServerUtil;
 
 
 /**
@@ -52,8 +52,29 @@ public abstract class MergeSortResultIterator implements PeekingResultIterator {
     
     @Override
     public void close() throws SQLException {
-        if (iterators != null) {
-            SQLCloseables.closeAll(iterators);
+        SQLException toThrow = null;
+        try {
+            if (resultIterators != null) {
+                resultIterators.close();
+            }
+        } catch (Exception e) {
+           toThrow = ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (iterators != null) {
+                    SQLCloseables.closeAll(iterators);
+                }
+            } catch (Exception e) {
+                if (toThrow == null) {
+                    toThrow = ServerUtil.parseServerException(e);
+                } else {
+                    toThrow.setNextException(ServerUtil.parseServerException(e));
+                }
+            } finally {
+                if (toThrow != null) {
+                    throw toThrow;
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
index ef2b534..16f8b41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
@@ -22,8 +22,9 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.util.SQLCloseable;
 
-public interface ResultIterators {
+public interface ResultIterators extends SQLCloseable {
     public int size();
     public List<KeyRange> getSplits();
     public List<List<Scan>> getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index f116695..73cc3c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.query;
 
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.phoenix.job.JobManager;
 import org.apache.phoenix.memory.GlobalMemoryManager;
@@ -35,7 +35,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
  * @since 0.1
  */
 public abstract class BaseQueryServicesImpl implements QueryServices {
-    private final ExecutorService executor;
+    private final ThreadPoolExecutor executor;
     private final MemoryManager memoryManager;
     private final ReadOnlyProps props;
     private final QueryOptimizer queryOptimizer;
@@ -53,7 +53,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices {
     }
     
     @Override
-    public ExecutorService getExecutor() {
+    public ThreadPoolExecutor getExecutor() {
         return executor;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
index 9d9a513..e58aa5f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateQueryServices.java
@@ -18,7 +18,7 @@
 package org.apache.phoenix.query;
 
 import java.sql.SQLException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.optimize.QueryOptimizer;
@@ -46,7 +46,7 @@ public class DelegateQueryServices implements QueryServices {
     }
     
     @Override
-    public ExecutorService getExecutor() {
+    public ThreadPoolExecutor getExecutor() {
         return parent.getExecutor();
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index b074fb4..ab1a8e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.query;
 
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.http.annotation.Immutable;
 import org.apache.phoenix.iterate.SpoolTooBigToDiskException;
@@ -145,7 +145,7 @@ public interface QueryServices extends SQLCloseable {
     /**
      * Get executor service used for parallel scans
      */
-    public ExecutorService getExecutor();
+    public ThreadPoolExecutor getExecutor();
     /**
      * Get the memory manager used to track memory usage
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index e2f6fdb..3197676 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -154,6 +154,10 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest {
 			public List<List<Scan>> getScans() {
 				return Collections.emptyList();
 			}
+
+            @Override
+            public void close() throws SQLException {
+            }
             
         };
         ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
index 02fdcea..cf71724 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
@@ -21,16 +21,19 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.AssertResults;
+import org.junit.Test;
 
 
 
@@ -78,6 +81,10 @@ public class ConcatResultIteratorTest {
 			public List<List<Scan>> getScans() {
 				return Collections.emptyList();
 			}
+
+            @Override
+            public void close() throws SQLException {
+            }
         };
 
         Tuple[] expectedResults = new Tuple[] {
@@ -137,6 +144,10 @@ public class ConcatResultIteratorTest {
 			public List<List<Scan>> getScans() {
 				return Collections.emptyList();
 			}
+
+            @Override
+            public void close() throws SQLException {
+            }
         };
         ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
         AssertResults.assertResults(scanner, expectedResults);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ee5d01a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
index 095027c..77e42b0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
@@ -21,16 +21,19 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.AssertResults;
+import org.junit.Test;
 
 
 public class MergeSortResultIteratorTest {
@@ -83,6 +86,10 @@ public class MergeSortResultIteratorTest {
 			public List<List<Scan>> getScans() {
 				return Collections.emptyList();
 			}
+
+            @Override
+            public void close() throws SQLException {
+            }
         };
         ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
         AssertResults.assertResults(scanner, expectedResults);