You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/30 04:16:45 UTC

incubator-tinkerpop git commit: I bare gifts to all you peasants. TinkerGraphComputer had a horribly stupid 'i can't believe we didn't realize it till now' threading issue. The fix exposed then too clone() errors in the step library (RepeatStep and Trave

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/thread-issue-tinkergraph 18e5ea9fe -> c160c3310


I bare gifts to all you peasants. TinkerGraphComputer had a horribly stupid 'i can't believe we didn't realize it till now' threading issue. The fix exposed then too clone() errors in the step library (RepeatStep and TraversalRing). There is a new GraphComptuerTest that tests worker spawning. Note that GraphComputer.workers(int) exists in this branch. Anywho, by way of China, we are solving TINKERPOP3-391.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c160c331
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c160c331
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c160c331

Branch: refs/heads/thread-issue-tinkergraph
Commit: c160c3310dd6ebbe62783cf886d0f267b1a92119
Parents: 18e5ea9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 29 20:16:34 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 29 20:16:34 2015 -0600

----------------------------------------------------------------------
 .../traversal/step/branch/RepeatStep.java       | 28 +++++-----
 .../process/traversal/util/TraversalRing.java   |  4 +-
 .../process/computer/GraphComputerTest.java     | 54 +++-----------------
 .../process/traversal/step/map/MatchTest.java   |  2 +-
 .../process/computer/TinkerGraphComputer.java   |  2 +-
 .../process/computer/TinkerMessenger.java       |  7 ++-
 .../process/computer/TinkerWorkerPool.java      | 46 +++++++++--------
 7 files changed, 56 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
index f9e1df6..d10c81f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
@@ -224,7 +224,7 @@ public final class RepeatStep<S> extends ComputerAwareStep<S, S> implements Trav
 
     ///////////////////////////////////
 
-    public class RepeatEndStep extends ComputerAwareStep<S, S> {
+    public static class RepeatEndStep<S> extends ComputerAwareStep<S, S> {
 
         public RepeatEndStep(final Traversal.Admin traversal) {
             super(traversal);
@@ -232,18 +232,19 @@ public final class RepeatStep<S> extends ComputerAwareStep<S, S> implements Trav
 
         @Override
         protected Iterator<Traverser<S>> standardAlgorithm() throws NoSuchElementException {
+            final RepeatStep<S> repeatStep = (RepeatStep<S>) this.getTraversal().getParent();
             while (true) {
                 final Traverser.Admin<S> start = this.starts.next();
                 start.incrLoops(this.getId());
-                if (doUntil(start, false)) {
+                if (repeatStep.doUntil(start, false)) {
                     start.resetLoops();
                     return IteratorUtils.of(start);
                 } else {
-                    if (!RepeatStep.this.untilFirst && !RepeatStep.this.emitFirst)
-                        RepeatStep.this.repeatTraversal.addStart(start);
+                    if (!repeatStep.untilFirst && !repeatStep.emitFirst)
+                        repeatStep.repeatTraversal.addStart(start);
                     else
-                        RepeatStep.this.addStart(start);
-                    if (doEmit(start, false)) {
+                        repeatStep.addStart(start);
+                    if (repeatStep.doEmit(start, false)) {
                         final Traverser.Admin<S> emitSplit = start.split();
                         emitSplit.resetLoops();
                         return IteratorUtils.of(emitSplit);
@@ -254,19 +255,20 @@ public final class RepeatStep<S> extends ComputerAwareStep<S, S> implements Trav
 
         @Override
         protected Iterator<Traverser<S>> computerAlgorithm() throws NoSuchElementException {
+            final RepeatStep<S> repeatStep = (RepeatStep<S>) this.getTraversal().getParent();
             final Traverser.Admin<S> start = this.starts.next();
-            start.incrLoops(RepeatStep.this.getId());
-            if (doUntil(start, false)) {
+            start.incrLoops(repeatStep.getId());
+            if (repeatStep.doUntil(start, false)) {
                 start.resetLoops();
-                start.setStepId(RepeatStep.this.getNextStep().getId());
-                start.addLabels(RepeatStep.this.labels);
+                start.setStepId(repeatStep.getNextStep().getId());
+                start.addLabels(repeatStep.labels);
                 return IteratorUtils.of(start);
             } else {
-                start.setStepId(RepeatStep.this.getId());
-                if (doEmit(start, false)) {
+                start.setStepId(repeatStep.getId());
+                if (repeatStep.doEmit(start, false)) {
                     final Traverser.Admin<S> emitSplit = start.split();
                     emitSplit.resetLoops();
-                    emitSplit.setStepId(RepeatStep.this.getNextStep().getId());
+                    emitSplit.setStepId(repeatStep.getNextStep().getId());
                     return IteratorUtils.of(start, emitSplit);
                 }
                 return IteratorUtils.of(start);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java
index 3b4c709..5742966 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalRing.java
@@ -32,8 +32,7 @@ import java.util.List;
  */
 public final class TraversalRing<A, B> implements Serializable, Cloneable {
 
-    private final IdentityTraversal<A, B> identityTraversal = new IdentityTraversal<>();
-
+    private IdentityTraversal<A, B> identityTraversal = new IdentityTraversal<>();
     private List<Traversal.Admin<A, B>> traversals = new ArrayList<>();
     private int currentTraversal = -1;
 
@@ -80,6 +79,7 @@ public final class TraversalRing<A, B> implements Serializable, Cloneable {
         try {
             final TraversalRing<A, B> clone = (TraversalRing<A, B>) super.clone();
             clone.traversals = new ArrayList<>();
+            clone.identityTraversal = new IdentityTraversal<>();
             for (final Traversal.Admin<A, B> traversal : this.traversals) {
                 clone.addTraversal(traversal.clone());
             }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 2c57e07..2e3a3d8 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -69,7 +69,7 @@ import static org.junit.Assert.*;
 @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
 public class GraphComputerTest extends AbstractGremlinProcessTest {
 
-   /* @Test
+    @Test
     @LoadGraphWith(MODERN)
     public void shouldHaveStandardStringRepresentation() {
         final GraphComputer computer = graph.compute(graphComputerClass.get());
@@ -1039,12 +1039,9 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
     @Test
     @LoadGraphWith(MODERN)
     public void shouldStartAndEndWorkersForVertexProgramAndMapReduce() throws Exception {
-        VertexProgramJ.TIME_KEEPER.set(-1l);
-        MapReduceI.TIME_KEEPER.set(-1l);
         MapReduceI.WORKER_START.clear();
         MapReduceI.WORKER_END.clear();
         assertEquals(3, graph.compute(graphComputerClass.get()).program(new VertexProgramJ()).mapReduce(new MapReduceI()).submit().get().memory().<Integer>get("a").intValue());
-        assertEquals(Long.MIN_VALUE, VertexProgramJ.TIME_KEEPER.get());
         if (MapReduceI.WORKER_START.size() == 2) {
             assertEquals(2, MapReduceI.WORKER_START.size());
             assertTrue(MapReduceI.WORKER_START.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_START.contains(MapReduce.Stage.REDUCE));
@@ -1063,7 +1060,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
     public static class VertexProgramJ extends StaticVertexProgram {
 
-        private static final AtomicLong TIME_KEEPER = new AtomicLong(-1l);
 
         @Override
         public void setup(final Memory memory) {
@@ -1073,10 +1069,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public void workerIterationStart(final Memory memory) {
             assertEquals(memory.getIteration(), memory.<Integer>get("test").intValue());
-            final long time = System.nanoTime();
-            if (!memory.isInitialIteration())
-                assertNotEquals(-1l, TIME_KEEPER.get());
-            assertTrue(TIME_KEEPER.getAndSet(time) <= time);
             try {
                 memory.set("test", memory.getIteration());
                 fail("Should throw an immutable memory exception");
@@ -1089,9 +1081,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         public void execute(Vertex vertex, Messenger messenger, Memory memory) {
             assertEquals(memory.getIteration(), memory.<Integer>get("test").intValue());
             memory.set("test", memory.getIteration() + 1);
-            sleep(10);
-            assertNotEquals(-1l, TIME_KEEPER.get());
-            assertTrue(TIME_KEEPER.get() <= System.nanoTime());
         }
 
         @Override
@@ -1102,8 +1091,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public void workerIterationEnd(final Memory memory) {
             assertEquals(memory.getIteration(), memory.<Integer>get("test").intValue());
-            assertNotEquals(-1l, TIME_KEEPER.get());
-            assertTrue(TIME_KEEPER.getAndSet(Long.MIN_VALUE) <= System.nanoTime());
             try {
                 memory.set("test", memory.getIteration());
                 fail("Should throw an immutable memory exception");
@@ -1135,7 +1122,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
     private static class MapReduceI extends StaticMapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
 
-        private static final AtomicLong TIME_KEEPER = new AtomicLong(-1l);
         private static final Set<Stage> WORKER_START = new ConcurrentSkipListSet<>();
         private static final Set<Stage> WORKER_END = new ConcurrentSkipListSet<>();
 
@@ -1146,9 +1132,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
         @Override
         public void workerStart(final Stage stage) {
-            final long time = System.nanoTime();
-            if (!stage.equals(Stage.MAP)) assertNotEquals(-1l, TIME_KEEPER.get());
-            assertTrue(TIME_KEEPER.getAndSet(time) <= time);
             WORKER_START.add(stage);
             if (!stage.equals(Stage.MAP))
                 assertFalse(WORKER_END.isEmpty());
@@ -1157,10 +1140,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
             emitter.emit(1);
-            sleep(10);
-            assertNotEquals(-1l, TIME_KEEPER.get());
-            final long time = System.nanoTime();
-            assertTrue(TIME_KEEPER.getAndSet(time) <= time);
             assertEquals(1, WORKER_START.size());
             assertTrue(WORKER_START.contains(Stage.MAP));
         }
@@ -1168,10 +1147,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public void combine(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
             emitter.emit(2);
-            sleep(10);
-            assertNotEquals(-1l, TIME_KEEPER.get());
-            final long time = System.nanoTime();
-            assertTrue(TIME_KEEPER.getAndSet(time) <= time);
             assertEquals(2, WORKER_START.size());
             assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.COMBINE));
             assertFalse(WORKER_END.isEmpty());
@@ -1180,10 +1155,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
             emitter.emit(3);
-            sleep(10);
-            assertNotEquals(-1l, TIME_KEEPER.get());
-            final long time = System.nanoTime();
-            assertTrue(TIME_KEEPER.getAndSet(time) <= time);
             if (WORKER_START.size() == 2) {
                 assertEquals(2, WORKER_START.size());
                 assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.REDUCE));
@@ -1196,9 +1167,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
         @Override
         public void workerEnd(final Stage stage) {
-            assertNotEquals(-1l, TIME_KEEPER.get());
-            final long time = System.nanoTime();
-            assertTrue(TIME_KEEPER.get() <= time);
             assertFalse(WORKER_START.isEmpty());
             if (!stage.equals(Stage.MAP))
                 assertFalse(WORKER_END.isEmpty());
@@ -1217,16 +1185,6 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
     }
 
-    /////////////////////////////////////////////////
-
-    private static void sleep(final long time) {
-        try {
-            Thread.sleep(time);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
     /////////////////////////////////////////////
 
     /////////////////////////////////////////////
@@ -1407,7 +1365,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         public GraphComputer.Persist getPreferredPersist() {
             return GraphComputer.Persist.EDGES;
         }
-    }*/
+    }
 
     /////////////////////////////////////////////
 
@@ -1435,10 +1393,14 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
         @Override
         public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
-            if(!this.threadIds.contains(Thread.currentThread().getName())) {
+            try {
+                Thread.sleep(1);
+            } catch (Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+            if (!this.threadIds.contains(Thread.currentThread().getName())) {
                 memory.incr("workerCount", 1l);
                 this.threadIds.add(Thread.currentThread().getName());
-                System.out.println(this.threadIds);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java
index 5fec094..5d2ed95 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MatchTest.java
@@ -337,7 +337,7 @@ public abstract class MatchTest extends AbstractGremlinProcessTest {
 
     @Test
     @LoadGraphWith(MODERN)
-    public void g_V_matchXa_created_b__b_0created_cX_whereXa_neq_cX_selectXa_c_nameX() throws Exception {
+    public void g_V_matchXa_created_b__b_0created_cX_whereXa_neq_cX_selectXa_cX() throws Exception {
         final Traversal<Vertex, Map<String, Vertex>> traversal = get_g_V_matchXa_created_b__b_0created_cX_whereXa_neq_cX_selectXa_cX();
         checkResults(makeMapList(2,
                 "a", convertToVertex(graph, "marko"), "c", convertToVertex(graph, "josh"),

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 9840d8a..c88fb4c 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -131,7 +131,7 @@ public final class TinkerGraphComputer implements GraphComputer {
                                 final Vertex vertex = vertices.next();
                                 if (null == vertex) break;
                                 vertexProgram.execute(
-                                        ComputerGraph.vertexProgram(vertex, this.vertexProgram),
+                                        ComputerGraph.vertexProgram(vertex, vertexProgram),
                                         new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
                                         this.memory
                                 );

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
index 8143da3..d0c302a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
@@ -93,8 +93,11 @@ public final class TinkerMessenger<M> implements Messenger<M> {
     }
 
     private void addMessage(final Vertex vertex, final M message) {
-        final Queue<M> queue = this.messageBoard.sendMessages.computeIfAbsent(vertex, v -> new ConcurrentLinkedQueue<>());
-        queue.add(null != this.combiner && !queue.isEmpty() ? this.combiner.combine(queue.remove(), message) : message);
+        this.messageBoard.sendMessages.compute(vertex, (v,queue) -> {
+            if(null == queue) queue = new ConcurrentLinkedQueue<>();
+            queue.add(null != this.combiner && !queue.isEmpty() ? this.combiner.combine(queue.remove(), message) : message);
+            return queue;
+        });
     }
 
     ///////////

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c160c331/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index c6d3f11..e9341b4 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -24,12 +24,10 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.function.Consumer;
 
 /**
@@ -38,17 +36,19 @@ import java.util.function.Consumer;
  */
 public final class TinkerWorkerPool implements AutoCloseable {
 
-    private static final BasicThreadFactory threadFactoryWorker = new BasicThreadFactory.Builder().namingPattern("tinker-worker-%d").build();
+    private static final BasicThreadFactory THREAD_FACTORY_WORKER = new BasicThreadFactory.Builder().namingPattern("tinker-worker-%d").build();
 
     private final int numberOfWorkers;
     private final ExecutorService workerPool;
+    private final CompletionService<Object> completionService;
 
     private VertexProgramPool vertexProgramPool;
     private MapReducePool mapReducePool;
 
     public TinkerWorkerPool(final int numberOfWorkers) {
         this.numberOfWorkers = numberOfWorkers;
-        workerPool = Executors.newFixedThreadPool(numberOfWorkers, threadFactoryWorker);
+        this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
+        this.completionService = new ExecutorCompletionService<>(this.workerPool);
     }
 
     public void setVertexProgram(final VertexProgram vertexProgram) {
@@ -60,33 +60,35 @@ public final class TinkerWorkerPool implements AutoCloseable {
     }
 
     public void executeVertexProgram(final Consumer<VertexProgram> worker) {
-        final List<Callable<Object>> tasks = new ArrayList<>();
-        for (int i = 0; i < 1; i++) {
-            tasks.add(() -> {
+        for (int i = 0; i < this.numberOfWorkers; i++) {
+            this.completionService.submit(() -> {
                 final VertexProgram vp = this.vertexProgramPool.take();
                 worker.accept(vp);
                 this.vertexProgramPool.offer(vp);
                 return null;
             });
         }
-        try {
-            final List<Future<Object>> futures = this.workerPool.invokeAll(tasks);
-           for(Future future : futures) {
-               future.get();
-           }
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
+        for (int i = 0; i < this.numberOfWorkers; i++) {
+            try {
+                this.completionService.take().get();
+            } catch (final Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
         }
     }
 
     public void executeMapReduce(final Consumer<MapReduce> worker) {
         for (int i = 0; i < this.numberOfWorkers; i++) {
+            this.completionService.submit(() -> {
+                final MapReduce mr = this.mapReducePool.take();
+                worker.accept(mr);
+                this.mapReducePool.offer(mr);
+                return null;
+            });
+        }
+        for (int i = 0; i < this.numberOfWorkers; i++) {
             try {
-                this.workerPool.submit(() -> {
-                    final MapReduce mr = this.mapReducePool.take();
-                    worker.accept(mr);
-                    this.mapReducePool.offer(mr);
-                }).get();
+                this.completionService.take().get();
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }
@@ -95,6 +97,6 @@ public final class TinkerWorkerPool implements AutoCloseable {
 
     @Override
     public void close() throws Exception {
-        workerPool.shutdown();
+        this.workerPool.shutdown();
     }
 }
\ No newline at end of file