You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/04/21 01:29:15 UTC
phoenix git commit: PHOENIX-1894 Iterators in
BaseResultIterators#submitWork should be added to a thread safe collection.
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 a0cab3bda -> d1bba5a28
PHOENIX-1894 Iterators in BaseResultIterators#submitWork should be added to a thread safe collection.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d1bba5a2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d1bba5a2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d1bba5a2
Branch: refs/heads/4.x-HBase-0.98
Commit: d1bba5a287841e339f400b909590b5ba9828fae4
Parents: a0cab3b
Author: Samarth <sa...@salesforce.com>
Authored: Mon Apr 20 16:29:09 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Mon Apr 20 16:29:09 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/phoenix/iterate/BaseResultIterators.java | 6 ++++--
.../java/org/apache/phoenix/iterate/ParallelIterators.java | 3 ++-
.../main/java/org/apache/phoenix/iterate/SerialIterators.java | 3 ++-
3 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1bba5a2/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 8d602b5..6a3847b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -29,10 +29,12 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
+import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -513,7 +515,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// Capture all iterators so that if something goes wrong, we close them all
// The iterators list is based on the submission of work, so it may not
// contain them all (for example if work was rejected from the queue)
- List<PeekingResultIterator> allIterators = Lists.newArrayListWithExpectedSize(this.splits.size());
+ Queue<PeekingResultIterator> allIterators = new ConcurrentLinkedQueue<>();
List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans);
final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans);
allFutures.add(futures);
@@ -680,7 +682,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
abstract protected String getName();
abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- List<PeekingResultIterator> allIterators, int estFlattenedSize);
+ Queue<PeekingResultIterator> allIterators, int estFlattenedSize);
@Override
public int size() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1bba5a2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index b74919b..97270ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_S
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -57,7 +58,7 @@ public class ParallelIterators extends BaseResultIterators {
@Override
protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
+ final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) {
// Pre-populate nestedFutures lists so that we can shuffle the scans
// and add the future to the right nested list. By shuffling the scans
// we get better utilization of the cluster since our thread executor
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1bba5a2/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index ded9344..6b3b5e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -60,7 +61,7 @@ public class SerialIterators extends BaseResultIterators {
@Override
protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
+ final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) {
// Pre-populate nestedFutures lists so that we can shuffle the scans
// and add the future to the right nested list. By shuffling the scans
// we get better utilization of the cluster since our thread executor