You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/23 06:28:46 UTC

[2/5] incubator-tinkerpop git commit: worked with @dkuppitz on a TinkerWorkerPool issue where startIteration and endIteration were not with their respective call thread. Exposed when doing bulk loading into Neo4j. Fix actually helped gut a bunch of code.

worked with @dkuppitz on a TinkerWorkerPool issue where startIteration and endIteration were not with their respective call thread. Exposed when doing bulk loading into Neo4j. Fix actually helped gut a bunch of code.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/1bfdb738
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/1bfdb738
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/1bfdb738

Branch: refs/heads/master
Commit: 1bfdb738d9f3f96a8019245403f5f2951db718a1
Parents: f09fd8b
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 22 17:16:36 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 22 17:16:36 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../process/computer/TinkerGraphComputer.java   | 19 +++++++++---------
 .../process/computer/TinkerWorkerPool.java      | 21 --------------------
 3 files changed, 10 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1bfdb738/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index d30fad5..ee7e7d9 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,7 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.0.2 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Fixed a thread context bug in `TinkerGraphComputer`.
 * Improved session closing for transactional graphs during shutdown of Gremlin Server.
 * Fixed id parameter used in tests for `GroovyStoreTest` and `GroovyRepeatTest` to not be treated as an embedded string.
 * `GraphStep` will convert any `Vertex` or `Edge` ids to their id `Object` prior to submission to `GraphComputer` (OLAP).

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1bfdb738/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index daaeb70..4d0e631 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -117,20 +117,20 @@ public final class TinkerGraphComputer implements GraphComputer {
                     this.memory.completeSubRound();
                     while (true) {
                         workers.setVertexProgram(this.vertexProgram);
-                        workers.vertexProgramWorkerIterationStart(this.memory.asImmutable());
                         final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
                         workers.executeVertexProgram(vertexProgram -> {
+                            vertexProgram.workerIterationStart(this.memory.asImmutable());
                             while (true) {
                                 final Vertex vertex = vertices.next();
-                                if (null == vertex) return;
+                                if (null == vertex) break;
                                 vertexProgram.execute(
                                         ComputerGraph.vertexProgram(vertex, this.vertexProgram),
                                         new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
                                         this.memory
                                 );
                             }
+                            vertexProgram.workerIterationEnd(this.memory.asImmutable());
                         });
-                        workers.vertexProgramWorkerIterationEnd(this.memory.asImmutable());
                         this.messageBoard.completeIteration();
                         this.memory.completeSubRound();
                         if (this.vertexProgram.terminate(this.memory)) {
@@ -150,16 +150,15 @@ public final class TinkerGraphComputer implements GraphComputer {
                         final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
                         final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
                         workers.setMapReduce(mapReduce);
-                        workers.mapReduceWorkerStart(MapReduce.Stage.MAP);
                         workers.executeMapReduce(workerMapReduce -> {
+                            workerMapReduce.workerStart(MapReduce.Stage.MAP);
                             while (true) {
                                 final Vertex vertex = vertices.next();
-                                if (null == vertex) return;
+                                if (null == vertex) break;
                                 workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
                             }
+                            workerMapReduce.workerEnd(MapReduce.Stage.MAP);
                         });
-                        workers.mapReduceWorkerEnd(MapReduce.Stage.MAP);
-
                         // sort results if a map output sort is defined
                         mapEmitter.complete(mapReduce);
 
@@ -167,15 +166,15 @@ public final class TinkerGraphComputer implements GraphComputer {
                         if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
                             final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
                             final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
-                            workers.mapReduceWorkerStart(MapReduce.Stage.REDUCE);
                             workers.executeMapReduce(workerMapReduce -> {
+                                workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
                                 while (true) {
                                     final Map.Entry<?, Queue<?>> entry = keyValues.next();
-                                    if (null == entry) return;
+                                    if (null == entry) break;
                                     workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
                                 }
+                                workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
                             });
-                            workers.mapReduceWorkerEnd(MapReduce.Stage.REDUCE);
                             reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
                             mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
                         } else {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1bfdb738/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 5935247..b47809f 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -56,16 +56,6 @@ public final class TinkerWorkerPool implements AutoCloseable {
         this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
     }
 
-    ////
-
-    public void vertexProgramWorkerIterationStart(final Memory memory) {
-        this.vertexProgramPool.workerIterationStart(memory);
-    }
-
-    public void vertexProgramWorkerIterationEnd(final Memory memory) {
-        this.vertexProgramPool.workerIterationEnd(memory);
-    }
-
     public void executeVertexProgram(final Consumer<VertexProgram> worker) {
         try {
             this.workerPool.submit(() -> {
@@ -78,17 +68,6 @@ public final class TinkerWorkerPool implements AutoCloseable {
         }
     }
 
-    ///
-
-    public void mapReduceWorkerStart(final MapReduce.Stage stage) {
-        this.mapReducePool.workerStart(stage);
-    }
-
-    public void mapReduceWorkerEnd(final MapReduce.Stage stage) {
-        this.mapReducePool.workerEnd(stage);
-    }
-
-
     public void executeMapReduce(final Consumer<MapReduce> worker) {
         try {
             this.workerPool.submit(() -> {