You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/20 22:20:07 UTC

[1/2] incubator-tinkerpop git commit: Refactored the TinkerWorkerPool to re-use threads with ExecutorService.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master d5f6702dc -> fef1f787e


Refactored the TinkerWorkerPool to re-use threads with ExecutorService.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/61e322b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/61e322b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/61e322b0

Branch: refs/heads/master
Commit: 61e322b0653acb57f7b4d4fefe712c2f3920e581
Parents: 26cc35e
Author: Stephen Mallette <sp...@apache.org>
Authored: Fri Feb 20 16:19:26 2015 -0500
Committer: Stephen Mallette <sp...@apache.org>
Committed: Fri Feb 20 16:19:26 2015 -0500

----------------------------------------------------------------------
 .../process/computer/TinkerGraphComputer.java   | 93 +++++++++++---------
 .../process/computer/TinkerWorkerPool.java      | 67 ++++++++------
 2 files changed, 88 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/61e322b0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index a466bb8..4132dd9 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Future;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public class TinkerGraphComputer implements GraphComputer {
 
@@ -101,61 +102,67 @@ public class TinkerGraphComputer implements GraphComputer {
                 // execute the vertex program
                 this.vertexProgram.setup(this.memory);
                 this.memory.completeSubRound();
-                final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), this.vertexProgram);
-                while (true) {
-                    workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationStart(this.memory.asImmutable()));
-                    final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(sg.iterators().vertexIterator());
-                    workers.executeVertexProgram(vertexProgram -> {
-                        while (true) {
-                            final Vertex vertex = vertices.next();
-                            if (null == vertex) return;
-                            vertexProgram.execute(vertex, new TinkerMessenger(vertex, this.messageBoard, vertexProgram.getMessageCombiner()), this.memory);
-                        }
-                    });
-                    workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationEnd(this.memory.asImmutable()));
-                    this.messageBoard.completeIteration();
-                    this.memory.completeSubRound();
-                    if (this.vertexProgram.terminate(this.memory)) {
-                        this.memory.incrIteration();
-                        this.memory.completeSubRound();
-                        break;
-                    } else {
-                        this.memory.incrIteration();
+                try (final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), this.vertexProgram)) {
+                    while (true) {
+                        workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationStart(this.memory.asImmutable()));
+                        final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(sg.iterators().vertexIterator());
+                        workers.executeVertexProgram(vertexProgram -> {
+                            while (true) {
+                                final Vertex vertex = vertices.next();
+                                if (null == vertex) return;
+                                vertexProgram.execute(vertex, new TinkerMessenger(vertex, this.messageBoard, vertexProgram.getMessageCombiner()), this.memory);
+                            }
+                        });
+                        workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationEnd(this.memory.asImmutable()));
+                        this.messageBoard.completeIteration();
                         this.memory.completeSubRound();
+                        if (this.vertexProgram.terminate(this.memory)) {
+                            this.memory.incrIteration();
+                            this.memory.completeSubRound();
+                            break;
+                        } else {
+                            this.memory.incrIteration();
+                            this.memory.completeSubRound();
+                        }
                     }
+                } catch (Exception ex) {
+                    throw new RuntimeException(ex);
                 }
             }
 
             // execute mapreduce jobs
             for (final MapReduce mapReduce : this.mapReducers) {
-                final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), mapReduce);
-                if (mapReduce.doStage(MapReduce.Stage.MAP)) {
-                    final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
-                    final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(sg.iterators().vertexIterator());
-                    workers.executeMapReduce(workerMapReduce -> {
-                        while (true) {
-                            final Vertex vertex = vertices.next();
-                            if (null == vertex) return;
-                            workerMapReduce.map(vertex, mapEmitter);
-                        }
-                    });
-                    mapEmitter.complete(mapReduce); // sort results if a map output sort is defined
-                    // no need to run combiners as this is single machine
-                    if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
-                        final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
-                        final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
+                try (final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), mapReduce)) {
+                    if (mapReduce.doStage(MapReduce.Stage.MAP)) {
+                        final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
+                        final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(sg.iterators().vertexIterator());
                         workers.executeMapReduce(workerMapReduce -> {
                             while (true) {
-                                final Map.Entry<?, Queue<?>> entry = keyValues.next();
-                                if (null == entry) return;
-                                workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
+                                final Vertex vertex = vertices.next();
+                                if (null == vertex) return;
+                                workerMapReduce.map(vertex, mapEmitter);
                             }
                         });
-                        reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
-                        mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
-                    } else {
-                        mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator());
+                        mapEmitter.complete(mapReduce); // sort results if a map output sort is defined
+                        // no need to run combiners as this is single machine
+                        if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
+                            final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
+                            final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
+                            workers.executeMapReduce(workerMapReduce -> {
+                                while (true) {
+                                    final Map.Entry<?, Queue<?>> entry = keyValues.next();
+                                    if (null == entry) return;
+                                    workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
+                                }
+                            });
+                            reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
+                            mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
+                        } else {
+                            mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator());
+                        }
                     }
+                } catch (Exception ex) {
+                    throw new RuntimeException(ex);
                 }
             }
             // update runtime and return the newly computed graph

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/61e322b0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 15a8011..d418d4a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -18,32 +18,46 @@
  */
 package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public class TinkerWorkerPool {
+public class TinkerWorkerPool implements AutoCloseable {
 
+    private static final BasicThreadFactory threadFactoryWorker = new BasicThreadFactory.Builder().namingPattern("tinker-worker-%d").build();
     public static enum State {VERTEX_PROGRAM, MAP_REDUCE}
 
-    private List<MapReduce> mapReducers;
-    private List<VertexProgram> vertexPrograms;
-    private State state;
+    private final List<MapReduce> mapReducers;
+    private final List<VertexProgram> vertexPrograms;
+    private final State state;
+
+    private final ExecutorService workerPool;
 
     public TinkerWorkerPool(final int numberOfWorkers, final VertexProgram vertexProgram) {
         try {
             this.state = State.VERTEX_PROGRAM;
             this.vertexPrograms = new ArrayList<>(numberOfWorkers);
+            this.mapReducers = Collections.emptyList();
             for (int i = 0; i < numberOfWorkers; i++) {
                 this.vertexPrograms.add(vertexProgram.clone());
             }
+
+            workerPool = Executors.newFixedThreadPool(numberOfWorkers, threadFactoryWorker);
+
         } catch (final CloneNotSupportedException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -52,10 +66,14 @@ public class TinkerWorkerPool {
     public TinkerWorkerPool(final int numberOfWorkers, final MapReduce mapReduce) {
         try {
             this.state = State.MAP_REDUCE;
+            this.vertexPrograms = Collections.emptyList();
             this.mapReducers = new ArrayList<>(numberOfWorkers);
             for (int i = 0; i < numberOfWorkers; i++) {
                 this.mapReducers.add(mapReduce.clone());
             }
+
+            workerPool = Executors.newFixedThreadPool(numberOfWorkers, threadFactoryWorker);
+
         } catch (final CloneNotSupportedException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -64,34 +82,25 @@ public class TinkerWorkerPool {
     public void executeVertexProgram(final Consumer<VertexProgram> worker) {
         if (!this.state.equals(State.VERTEX_PROGRAM))
             throw new IllegalStateException("The provided TinkerWorkerPool is not setup for VertexProgram: " + this.state);
-        final CountDownLatch activeWorkers = new CountDownLatch(this.vertexPrograms.size());
-        for (final VertexProgram vertexProgram : this.vertexPrograms) {
-            new Thread(() -> {
-                worker.accept(vertexProgram);
-                activeWorkers.countDown();
-            }).start();
-        }
-        try {
-            activeWorkers.await();
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
+
+        this.vertexPrograms.stream()
+                .map(vp -> (Future<Void>) workerPool.submit(() -> worker.accept(vp)))
+                .collect(Collectors.toList())
+                .forEach(FunctionUtils.wrapConsumer(Future::get));
     }
 
     public void executeMapReduce(final Consumer<MapReduce> worker) {
         if (!this.state.equals(State.MAP_REDUCE))
             throw new IllegalStateException("The provided TinkerWorkerPool is not setup for MapReduce: " + this.state);
-        final CountDownLatch activeWorkers = new CountDownLatch(this.mapReducers.size());
-        for (final MapReduce mapReduce : this.mapReducers) {
-            new Thread(() -> {
-                worker.accept(mapReduce);
-                activeWorkers.countDown();
-            }).start();
-        }
-        try {
-            activeWorkers.await();
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
+
+        this.mapReducers.stream()
+                .map(mr -> (Future<Void>) workerPool.submit(() -> worker.accept(mr)))
+                .collect(Collectors.toList())
+                .forEach(FunctionUtils.wrapConsumer(Future::get));
+    }
+
+    @Override
+    public void close() throws Exception {
+        workerPool.shutdown();
     }
 }
\ No newline at end of file


[2/2] incubator-tinkerpop git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-tinkerpop

Posted by sp...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-tinkerpop


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fef1f787
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fef1f787
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fef1f787

Branch: refs/heads/master
Commit: fef1f787edda72a1021bb634528203cf30da70e2
Parents: 61e322b d5f6702
Author: Stephen Mallette <sp...@apache.org>
Authored: Fri Feb 20 16:19:58 2015 -0500
Committer: Stephen Mallette <sp...@apache.org>
Committed: Fri Feb 20 16:19:58 2015 -0500

----------------------------------------------------------------------
 docs/src/the-traversal.asciidoc | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------