You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/20 22:20:07 UTC
[1/2] incubator-tinkerpop git commit: Refactored the TinkerWorkerPool
to re-use threads with ExecutorService.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master d5f6702dc -> fef1f787e
Refactored the TinkerWorkerPool to re-use threads with ExecutorService.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/61e322b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/61e322b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/61e322b0
Branch: refs/heads/master
Commit: 61e322b0653acb57f7b4d4fefe712c2f3920e581
Parents: 26cc35e
Author: Stephen Mallette <sp...@apache.org>
Authored: Fri Feb 20 16:19:26 2015 -0500
Committer: Stephen Mallette <sp...@apache.org>
Committed: Fri Feb 20 16:19:26 2015 -0500
----------------------------------------------------------------------
.../process/computer/TinkerGraphComputer.java | 93 +++++++++++---------
.../process/computer/TinkerWorkerPool.java | 67 ++++++++------
2 files changed, 88 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/61e322b0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index a466bb8..4132dd9 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Future;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class TinkerGraphComputer implements GraphComputer {
@@ -101,61 +102,67 @@ public class TinkerGraphComputer implements GraphComputer {
// execute the vertex program
this.vertexProgram.setup(this.memory);
this.memory.completeSubRound();
- final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), this.vertexProgram);
- while (true) {
- workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationStart(this.memory.asImmutable()));
- final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(sg.iterators().vertexIterator());
- workers.executeVertexProgram(vertexProgram -> {
- while (true) {
- final Vertex vertex = vertices.next();
- if (null == vertex) return;
- vertexProgram.execute(vertex, new TinkerMessenger(vertex, this.messageBoard, vertexProgram.getMessageCombiner()), this.memory);
- }
- });
- workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationEnd(this.memory.asImmutable()));
- this.messageBoard.completeIteration();
- this.memory.completeSubRound();
- if (this.vertexProgram.terminate(this.memory)) {
- this.memory.incrIteration();
- this.memory.completeSubRound();
- break;
- } else {
- this.memory.incrIteration();
+ try (final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), this.vertexProgram)) {
+ while (true) {
+ workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationStart(this.memory.asImmutable()));
+ final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(sg.iterators().vertexIterator());
+ workers.executeVertexProgram(vertexProgram -> {
+ while (true) {
+ final Vertex vertex = vertices.next();
+ if (null == vertex) return;
+ vertexProgram.execute(vertex, new TinkerMessenger(vertex, this.messageBoard, vertexProgram.getMessageCombiner()), this.memory);
+ }
+ });
+ workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationEnd(this.memory.asImmutable()));
+ this.messageBoard.completeIteration();
this.memory.completeSubRound();
+ if (this.vertexProgram.terminate(this.memory)) {
+ this.memory.incrIteration();
+ this.memory.completeSubRound();
+ break;
+ } else {
+ this.memory.incrIteration();
+ this.memory.completeSubRound();
+ }
}
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
}
// execute mapreduce jobs
for (final MapReduce mapReduce : this.mapReducers) {
- final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), mapReduce);
- if (mapReduce.doStage(MapReduce.Stage.MAP)) {
- final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
- final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(sg.iterators().vertexIterator());
- workers.executeMapReduce(workerMapReduce -> {
- while (true) {
- final Vertex vertex = vertices.next();
- if (null == vertex) return;
- workerMapReduce.map(vertex, mapEmitter);
- }
- });
- mapEmitter.complete(mapReduce); // sort results if a map output sort is defined
- // no need to run combiners as this is single machine
- if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
- final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
- final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
+ try (final TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), mapReduce)) {
+ if (mapReduce.doStage(MapReduce.Stage.MAP)) {
+ final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
+ final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(sg.iterators().vertexIterator());
workers.executeMapReduce(workerMapReduce -> {
while (true) {
- final Map.Entry<?, Queue<?>> entry = keyValues.next();
- if (null == entry) return;
- workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
+ final Vertex vertex = vertices.next();
+ if (null == vertex) return;
+ workerMapReduce.map(vertex, mapEmitter);
}
});
- reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
- mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
- } else {
- mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator());
+ mapEmitter.complete(mapReduce); // sort results if a map output sort is defined
+ // no need to run combiners as this is single machine
+ if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
+ final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
+ final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
+ workers.executeMapReduce(workerMapReduce -> {
+ while (true) {
+ final Map.Entry<?, Queue<?>> entry = keyValues.next();
+ if (null == entry) return;
+ workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
+ }
+ });
+ reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
+ mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
+ } else {
+ mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator());
+ }
}
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
}
// update runtime and return the newly computed graph
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/61e322b0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 15a8011..d418d4a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -18,32 +18,46 @@
*/
package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public class TinkerWorkerPool {
+public class TinkerWorkerPool implements AutoCloseable {
+ private static final BasicThreadFactory threadFactoryWorker = new BasicThreadFactory.Builder().namingPattern("tinker-worker-%d").build();
public static enum State {VERTEX_PROGRAM, MAP_REDUCE}
- private List<MapReduce> mapReducers;
- private List<VertexProgram> vertexPrograms;
- private State state;
+ private final List<MapReduce> mapReducers;
+ private final List<VertexProgram> vertexPrograms;
+ private final State state;
+
+ private final ExecutorService workerPool;
public TinkerWorkerPool(final int numberOfWorkers, final VertexProgram vertexProgram) {
try {
this.state = State.VERTEX_PROGRAM;
this.vertexPrograms = new ArrayList<>(numberOfWorkers);
+ this.mapReducers = Collections.emptyList();
for (int i = 0; i < numberOfWorkers; i++) {
this.vertexPrograms.add(vertexProgram.clone());
}
+
+ workerPool = Executors.newFixedThreadPool(numberOfWorkers, threadFactoryWorker);
+
} catch (final CloneNotSupportedException e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -52,10 +66,14 @@ public class TinkerWorkerPool {
public TinkerWorkerPool(final int numberOfWorkers, final MapReduce mapReduce) {
try {
this.state = State.MAP_REDUCE;
+ this.vertexPrograms = Collections.emptyList();
this.mapReducers = new ArrayList<>(numberOfWorkers);
for (int i = 0; i < numberOfWorkers; i++) {
this.mapReducers.add(mapReduce.clone());
}
+
+ workerPool = Executors.newFixedThreadPool(numberOfWorkers, threadFactoryWorker);
+
} catch (final CloneNotSupportedException e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -64,34 +82,25 @@ public class TinkerWorkerPool {
public void executeVertexProgram(final Consumer<VertexProgram> worker) {
if (!this.state.equals(State.VERTEX_PROGRAM))
throw new IllegalStateException("The provided TinkerWorkerPool is not setup for VertexProgram: " + this.state);
- final CountDownLatch activeWorkers = new CountDownLatch(this.vertexPrograms.size());
- for (final VertexProgram vertexProgram : this.vertexPrograms) {
- new Thread(() -> {
- worker.accept(vertexProgram);
- activeWorkers.countDown();
- }).start();
- }
- try {
- activeWorkers.await();
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
+
+ this.vertexPrograms.stream()
+ .map(vp -> (Future<Void>) workerPool.submit(() -> worker.accept(vp)))
+ .collect(Collectors.toList())
+ .forEach(FunctionUtils.wrapConsumer(Future::get));
}
public void executeMapReduce(final Consumer<MapReduce> worker) {
if (!this.state.equals(State.MAP_REDUCE))
throw new IllegalStateException("The provided TinkerWorkerPool is not setup for MapReduce: " + this.state);
- final CountDownLatch activeWorkers = new CountDownLatch(this.mapReducers.size());
- for (final MapReduce mapReduce : this.mapReducers) {
- new Thread(() -> {
- worker.accept(mapReduce);
- activeWorkers.countDown();
- }).start();
- }
- try {
- activeWorkers.await();
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
+
+ this.mapReducers.stream()
+ .map(mr -> (Future<Void>) workerPool.submit(() -> worker.accept(mr)))
+ .collect(Collectors.toList())
+ .forEach(FunctionUtils.wrapConsumer(Future::get));
+ }
+
+ @Override
+ public void close() throws Exception {
+ workerPool.shutdown();
}
}
\ No newline at end of file
[2/2] incubator-tinkerpop git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-tinkerpop
Posted by sp...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-tinkerpop
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fef1f787
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fef1f787
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fef1f787
Branch: refs/heads/master
Commit: fef1f787edda72a1021bb634528203cf30da70e2
Parents: 61e322b d5f6702
Author: Stephen Mallette <sp...@apache.org>
Authored: Fri Feb 20 16:19:58 2015 -0500
Committer: Stephen Mallette <sp...@apache.org>
Committed: Fri Feb 20 16:19:58 2015 -0500
----------------------------------------------------------------------
docs/src/the-traversal.asciidoc | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------