You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/04/21 01:29:15 UTC

phoenix git commit: PHOENIX-1894 Iterators in BaseResultIterators#submitWork should be added to a thread safe collection.

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 a0cab3bda -> d1bba5a28


PHOENIX-1894 Iterators in BaseResultIterators#submitWork should be added to a thread safe collection.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d1bba5a2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d1bba5a2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d1bba5a2

Branch: refs/heads/4.x-HBase-0.98
Commit: d1bba5a287841e339f400b909590b5ba9828fae4
Parents: a0cab3b
Author: Samarth <sa...@salesforce.com>
Authored: Mon Apr 20 16:29:09 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Mon Apr 20 16:29:09 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/phoenix/iterate/BaseResultIterators.java   | 6 ++++--
 .../java/org/apache/phoenix/iterate/ParallelIterators.java     | 3 ++-
 .../main/java/org/apache/phoenix/iterate/SerialIterators.java  | 3 ++-
 3 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1bba5a2/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 8d602b5..6a3847b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -29,10 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -513,7 +515,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         // Capture all iterators so that if something goes wrong, we close them all
         // The iterators list is based on the submission of work, so it may not
         // contain them all (for example if work was rejected from the queue)
-        List<PeekingResultIterator> allIterators = Lists.newArrayListWithExpectedSize(this.splits.size());
+        Queue<PeekingResultIterator> allIterators = new ConcurrentLinkedQueue<>();
         List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans);
         final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans);
         allFutures.add(futures);
@@ -680,7 +682,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            List<PeekingResultIterator> allIterators, int estFlattenedSize);
+            Queue<PeekingResultIterator> allIterators, int estFlattenedSize);
     
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1bba5a2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index b74919b..97270ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_S
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -57,7 +58,7 @@ public class ParallelIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1bba5a2/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index ded9344..6b3b5e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -60,7 +61,7 @@ public class SerialIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor