You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/30 04:16:45 UTC
incubator-tinkerpop git commit: I bare gifts to all you peasants.
TinkerGraphComputer had a horribly stupid 'i can't believe we didn't realize
it till now' threading issue. The fix exposed then too clone() errors in the
step library (RepeatStep and Trave
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/thread-issue-tinkergraph 18e5ea9fe -> c160c3310
I bare gifts to all you peasants. TinkerGraphComputer had a horribly stupid 'i can't believe we didn't realize it till now' threading issue. The fix exposed then too clone() errors in the step library (RepeatStep and TraversalRing). There is a new GraphComptuerTest that tests worker spawning. Note that GraphComputer.workers(int) exists in this branch. Anywho, by way of China, we are solving TINKERPOP3-391.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c160c331
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c160c331
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c160c331
Branch: refs/heads/thread-issue-tinkergraph
Commit: c160c3310dd6ebbe62783cf886d0f267b1a92119
Parents: 18e5ea9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 29 20:16:34 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 29 20:16:34 2015 -0600
----------------------------------------------------------------------
.../traversal/step/branch/RepeatStep.java | 28 +++++-----
.../process/traversal/util/TraversalRing.java | 4 +-
.../process/computer/GraphComputerTest.java | 54 +++-----------------
.../process/traversal/step/map/MatchTest.java | 2 +-
.../process/computer/TinkerGraphComputer.java | 2 +-
.../process/computer/TinkerMessenger.java | 7 ++-
.../process/computer/TinkerWorkerPool.java | 46 +++++++++--------
7 files changed, 56 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
index f9e1df6..d10c81f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
@@ -224,7 +224,7 @@ public final class RepeatStep<S> extends ComputerAwareStep<S, S> implements Trav
///////////////////////////////////
- public class RepeatEndStep extends ComputerAwareStep<S, S> {
+ public static class RepeatEndStep<S> extends ComputerAwareStep<S, S> {
public RepeatEndStep(final Traversal.Admin traversal) {
super(traversal);
@@ -232,18 +232,19 @@ public final class RepeatStep<S> extends ComputerAwareStep<S, S> implements Trav
@Override
protected Iterator<Traverser<S>> standardAlgorithm() throws NoSuchElementException {
+ final RepeatStep<S> repeatStep = (RepeatStep<S>) this.getTraversal().getParent();
while (true) {
final Traverser.Admin<S> start = this.starts.next();
start.incrLoops(this.getId());
- if (doUntil(start, false)) {
+ if (repeatStep.doUntil(start, false)) {
start.resetLoops();
return IteratorUtils.of(start);
} else {
- if (!RepeatStep.this.untilFirst && !RepeatStep.this.emitFirst)
- RepeatStep.this.repeatTraversal.addStart(start);
+ if (!repeatStep.untilFirst && !repeatStep.emitFirst)
+ repeatStep.repeatTraversal.addStart(start);
else
- RepeatStep.this.addStart(start);
- if (doEmit(start, false)) {
+ repeatStep.addStart(start);
+ if (repeatStep.doEmit(start, false)) {
final Traverser.Admin<S> emitSplit = start.split();
emitSplit.resetLoops();
return IteratorUtils.of(emitSplit);
@@ -254,19 +255,20 @@ public final class RepeatStep<S> extends ComputerAwareStep<S, S> implements Trav
@Override
protected Iterator<Traverser<S>> computerAlgorithm() throws NoSuchElementException {
+ final RepeatStep<S> repeatStep = (RepeatStep<S>) this.getTraversal().getParent();
final Traverser.Admin<S> start = this.starts.next();
- start.incrLoops(RepeatStep.this.getId());
- if (doUntil(start, false)) {
+ start.incrLoops(repeatStep.getId());
+ if (repeatStep.doUntil(start, false)) {
start.resetLoops();
- start.setStepId(RepeatStep.this.getNextStep().getId());
- start.addLabels(RepeatStep.this.labels);
+ start.setStepId(repeatStep.getNextStep().getId());
+ start.addLabels(repeatStep.labels);
return IteratorUtils.of(start);
} else {
- start.setStepId(RepeatStep.this.getId());
- if (doEmit(start, false)) {
+ start.setStepId(repeatStep.getId());
+ if (repeatStep.doEmit(start, false)) {
final Traverser.Admin<S> emitSplit = start.split();
emitSplit.resetLoops();
- emitSplit.setStepId(RepeatStep.this.getNextStep().getId());
+ emitSplit.setStepId(repeatStep.getNextStep().getId());
return IteratorUtils.of(start, emitSplit);
}
return IteratorUtils.of(start);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java
index 3b4c709..5742966 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java
@@ -32,8 +32,7 @@ import java.util.List;
*/
public final class TraversalRing<A, B> implements Serializable, Cloneable {
- private final IdentityTraversal<A, B> identityTraversal = new IdentityTraversal<>();
-
+ private IdentityTraversal<A, B> identityTraversal = new IdentityTraversal<>();
private List<Traversal.Admin<A, B>> traversals = new ArrayList<>();
private int currentTraversal = -1;
@@ -80,6 +79,7 @@ public final class TraversalRing<A, B> implements Serializable, Cloneable {
try {
final TraversalRing<A, B> clone = (TraversalRing<A, B>) super.clone();
clone.traversals = new ArrayList<>();
+ clone.identityTraversal = new IdentityTraversal<>();
for (final Traversal.Admin<A, B> traversal : this.traversals) {
clone.addTraversal(traversal.clone());
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 2c57e07..2e3a3d8 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -69,7 +69,7 @@ import static org.junit.Assert.*;
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public class GraphComputerTest extends AbstractGremlinProcessTest {
- /* @Test
+ @Test
@LoadGraphWith(MODERN)
public void shouldHaveStandardStringRepresentation() {
final GraphComputer computer = graph.compute(graphComputerClass.get());
@@ -1039,12 +1039,9 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Test
@LoadGraphWith(MODERN)
public void shouldStartAndEndWorkersForVertexProgramAndMapReduce() throws Exception {
- VertexProgramJ.TIME_KEEPER.set(-1l);
- MapReduceI.TIME_KEEPER.set(-1l);
MapReduceI.WORKER_START.clear();
MapReduceI.WORKER_END.clear();
assertEquals(3, graph.compute(graphComputerClass.get()).program(new VertexProgramJ()).mapReduce(new MapReduceI()).submit().get().memory().<Integer>get("a").intValue());
- assertEquals(Long.MIN_VALUE, VertexProgramJ.TIME_KEEPER.get());
if (MapReduceI.WORKER_START.size() == 2) {
assertEquals(2, MapReduceI.WORKER_START.size());
assertTrue(MapReduceI.WORKER_START.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_START.contains(MapReduce.Stage.REDUCE));
@@ -1063,7 +1060,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
public static class VertexProgramJ extends StaticVertexProgram {
- private static final AtomicLong TIME_KEEPER = new AtomicLong(-1l);
@Override
public void setup(final Memory memory) {
@@ -1073,10 +1069,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void workerIterationStart(final Memory memory) {
assertEquals(memory.getIteration(), memory.<Integer>get("test").intValue());
- final long time = System.nanoTime();
- if (!memory.isInitialIteration())
- assertNotEquals(-1l, TIME_KEEPER.get());
- assertTrue(TIME_KEEPER.getAndSet(time) <= time);
try {
memory.set("test", memory.getIteration());
fail("Should throw an immutable memory exception");
@@ -1089,9 +1081,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
public void execute(Vertex vertex, Messenger messenger, Memory memory) {
assertEquals(memory.getIteration(), memory.<Integer>get("test").intValue());
memory.set("test", memory.getIteration() + 1);
- sleep(10);
- assertNotEquals(-1l, TIME_KEEPER.get());
- assertTrue(TIME_KEEPER.get() <= System.nanoTime());
}
@Override
@@ -1102,8 +1091,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void workerIterationEnd(final Memory memory) {
assertEquals(memory.getIteration(), memory.<Integer>get("test").intValue());
- assertNotEquals(-1l, TIME_KEEPER.get());
- assertTrue(TIME_KEEPER.getAndSet(Long.MIN_VALUE) <= System.nanoTime());
try {
memory.set("test", memory.getIteration());
fail("Should throw an immutable memory exception");
@@ -1135,7 +1122,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
private static class MapReduceI extends StaticMapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
- private static final AtomicLong TIME_KEEPER = new AtomicLong(-1l);
private static final Set<Stage> WORKER_START = new ConcurrentSkipListSet<>();
private static final Set<Stage> WORKER_END = new ConcurrentSkipListSet<>();
@@ -1146,9 +1132,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void workerStart(final Stage stage) {
- final long time = System.nanoTime();
- if (!stage.equals(Stage.MAP)) assertNotEquals(-1l, TIME_KEEPER.get());
- assertTrue(TIME_KEEPER.getAndSet(time) <= time);
WORKER_START.add(stage);
if (!stage.equals(Stage.MAP))
assertFalse(WORKER_END.isEmpty());
@@ -1157,10 +1140,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
emitter.emit(1);
- sleep(10);
- assertNotEquals(-1l, TIME_KEEPER.get());
- final long time = System.nanoTime();
- assertTrue(TIME_KEEPER.getAndSet(time) <= time);
assertEquals(1, WORKER_START.size());
assertTrue(WORKER_START.contains(Stage.MAP));
}
@@ -1168,10 +1147,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void combine(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
emitter.emit(2);
- sleep(10);
- assertNotEquals(-1l, TIME_KEEPER.get());
- final long time = System.nanoTime();
- assertTrue(TIME_KEEPER.getAndSet(time) <= time);
assertEquals(2, WORKER_START.size());
assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.COMBINE));
assertFalse(WORKER_END.isEmpty());
@@ -1180,10 +1155,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
emitter.emit(3);
- sleep(10);
- assertNotEquals(-1l, TIME_KEEPER.get());
- final long time = System.nanoTime();
- assertTrue(TIME_KEEPER.getAndSet(time) <= time);
if (WORKER_START.size() == 2) {
assertEquals(2, WORKER_START.size());
assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.REDUCE));
@@ -1196,9 +1167,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void workerEnd(final Stage stage) {
- assertNotEquals(-1l, TIME_KEEPER.get());
- final long time = System.nanoTime();
- assertTrue(TIME_KEEPER.get() <= time);
assertFalse(WORKER_START.isEmpty());
if (!stage.equals(Stage.MAP))
assertFalse(WORKER_END.isEmpty());
@@ -1217,16 +1185,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
}
}
- /////////////////////////////////////////////////
-
- private static void sleep(final long time) {
- try {
- Thread.sleep(time);
- } catch (InterruptedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
/////////////////////////////////////////////
/////////////////////////////////////////////
@@ -1407,7 +1365,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.EDGES;
}
- }*/
+ }
/////////////////////////////////////////////
@@ -1435,10 +1393,14 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
- if(!this.threadIds.contains(Thread.currentThread().getName())) {
+ try {
+ Thread.sleep(1);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ if (!this.threadIds.contains(Thread.currentThread().getName())) {
memory.incr("workerCount", 1l);
this.threadIds.add(Thread.currentThread().getName());
- System.out.println(this.threadIds);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java
index 5fec094..5d2ed95 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java
@@ -337,7 +337,7 @@ public abstract class MatchTest extends AbstractGremlinProcessTest {
@Test
@LoadGraphWith(MODERN)
- public void g_V_matchXa_created_b__b_0created_cX_whereXa_neq_cX_selectXa_c_nameX() throws Exception {
+ public void g_V_matchXa_created_b__b_0created_cX_whereXa_neq_cX_selectXa_cX() throws Exception {
final Traversal<Vertex, Map<String, Vertex>> traversal = get_g_V_matchXa_created_b__b_0created_cX_whereXa_neq_cX_selectXa_cX();
checkResults(makeMapList(2,
"a", convertToVertex(graph, "marko"), "c", convertToVertex(graph, "josh"),
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 9840d8a..c88fb4c 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -131,7 +131,7 @@ public final class TinkerGraphComputer implements GraphComputer {
final Vertex vertex = vertices.next();
if (null == vertex) break;
vertexProgram.execute(
- ComputerGraph.vertexProgram(vertex, this.vertexProgram),
+ ComputerGraph.vertexProgram(vertex, vertexProgram),
new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
this.memory
);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
index 8143da3..d0c302a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
@@ -93,8 +93,11 @@ public final class TinkerMessenger<M> implements Messenger<M> {
}
private void addMessage(final Vertex vertex, final M message) {
- final Queue<M> queue = this.messageBoard.sendMessages.computeIfAbsent(vertex, v -> new ConcurrentLinkedQueue<>());
- queue.add(null != this.combiner && !queue.isEmpty() ? this.combiner.combine(queue.remove(), message) : message);
+ this.messageBoard.sendMessages.compute(vertex, (v,queue) -> {
+ if(null == queue) queue = new ConcurrentLinkedQueue<>();
+ queue.add(null != this.combiner && !queue.isEmpty() ? this.combiner.combine(queue.remove(), message) : message);
+ return queue;
+ });
}
///////////
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index c6d3f11..e9341b4 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -24,12 +24,10 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.function.Consumer;
/**
@@ -38,17 +36,19 @@ import java.util.function.Consumer;
*/
public final class TinkerWorkerPool implements AutoCloseable {
- private static final BasicThreadFactory threadFactoryWorker = new BasicThreadFactory.Builder().namingPattern("tinker-worker-%d").build();
+ private static final BasicThreadFactory THREAD_FACTORY_WORKER = new BasicThreadFactory.Builder().namingPattern("tinker-worker-%d").build();
private final int numberOfWorkers;
private final ExecutorService workerPool;
+ private final CompletionService<Object> completionService;
private VertexProgramPool vertexProgramPool;
private MapReducePool mapReducePool;
public TinkerWorkerPool(final int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
- workerPool = Executors.newFixedThreadPool(numberOfWorkers, threadFactoryWorker);
+ this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
+ this.completionService = new ExecutorCompletionService<>(this.workerPool);
}
public void setVertexProgram(final VertexProgram vertexProgram) {
@@ -60,33 +60,35 @@ public final class TinkerWorkerPool implements AutoCloseable {
}
public void executeVertexProgram(final Consumer<VertexProgram> worker) {
- final List<Callable<Object>> tasks = new ArrayList<>();
- for (int i = 0; i < 1; i++) {
- tasks.add(() -> {
+ for (int i = 0; i < this.numberOfWorkers; i++) {
+ this.completionService.submit(() -> {
final VertexProgram vp = this.vertexProgramPool.take();
worker.accept(vp);
this.vertexProgramPool.offer(vp);
return null;
});
}
- try {
- final List<Future<Object>> futures = this.workerPool.invokeAll(tasks);
- for(Future future : futures) {
- future.get();
- }
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
+ for (int i = 0; i < this.numberOfWorkers; i++) {
+ try {
+ this.completionService.take().get();
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
}
}
public void executeMapReduce(final Consumer<MapReduce> worker) {
for (int i = 0; i < this.numberOfWorkers; i++) {
+ this.completionService.submit(() -> {
+ final MapReduce mr = this.mapReducePool.take();
+ worker.accept(mr);
+ this.mapReducePool.offer(mr);
+ return null;
+ });
+ }
+ for (int i = 0; i < this.numberOfWorkers; i++) {
try {
- this.workerPool.submit(() -> {
- final MapReduce mr = this.mapReducePool.take();
- worker.accept(mr);
- this.mapReducePool.offer(mr);
- }).get();
+ this.completionService.take().get();
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -95,6 +97,6 @@ public final class TinkerWorkerPool implements AutoCloseable {
@Override
public void close() throws Exception {
- workerPool.shutdown();
+ this.workerPool.shutdown();
}
}
\ No newline at end of file