You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/23 01:16:44 UTC
incubator-tinkerpop git commit: worked with @dkuppitz on a
TinkerWorkerPool issue where startIteration and endIteration were not with
their respective call thread. Exposed when doing bulk loading into Neo4j. Fix
actually helped gut a bunch of code.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/tp30 f09fd8bfc -> 1bfdb738d
worked with @dkuppitz on a TinkerWorkerPool issue where startIteration and endIteration were not with their respective call thread. Exposed when doing bulk loading into Neo4j. Fix actually helped gut a bunch of code.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/1bfdb738
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/1bfdb738
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/1bfdb738
Branch: refs/heads/tp30
Commit: 1bfdb738d9f3f96a8019245403f5f2951db718a1
Parents: f09fd8b
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Sep 22 17:16:36 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Sep 22 17:16:36 2015 -0600
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
.../process/computer/TinkerGraphComputer.java | 19 +++++++++---------
.../process/computer/TinkerWorkerPool.java | 21 --------------------
3 files changed, 10 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1bfdb738/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index d30fad5..ee7e7d9 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,7 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
TinkerPop 3.0.2 (NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+* Fixed a thread context bug in `TinkerGraphComputer`.
* Improved session closing for transactional graphs during shutdown of Gremlin Server.
* Fixed id parameter used in tests for `GroovyStoreTest` and `GroovyRepeatTest` to not be treated as an embedded string.
* `GraphStep` will convert any `Vertex` or `Edge` ids to their id `Object` prior to submission to `GraphComputer` (OLAP).
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1bfdb738/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index daaeb70..4d0e631 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -117,20 +117,20 @@ public final class TinkerGraphComputer implements GraphComputer {
this.memory.completeSubRound();
while (true) {
workers.setVertexProgram(this.vertexProgram);
- workers.vertexProgramWorkerIterationStart(this.memory.asImmutable());
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
workers.executeVertexProgram(vertexProgram -> {
+ vertexProgram.workerIterationStart(this.memory.asImmutable());
while (true) {
final Vertex vertex = vertices.next();
- if (null == vertex) return;
+ if (null == vertex) break;
vertexProgram.execute(
ComputerGraph.vertexProgram(vertex, this.vertexProgram),
new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
this.memory
);
}
+ vertexProgram.workerIterationEnd(this.memory.asImmutable());
});
- workers.vertexProgramWorkerIterationEnd(this.memory.asImmutable());
this.messageBoard.completeIteration();
this.memory.completeSubRound();
if (this.vertexProgram.terminate(this.memory)) {
@@ -150,16 +150,15 @@ public final class TinkerGraphComputer implements GraphComputer {
final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
workers.setMapReduce(mapReduce);
- workers.mapReduceWorkerStart(MapReduce.Stage.MAP);
workers.executeMapReduce(workerMapReduce -> {
+ workerMapReduce.workerStart(MapReduce.Stage.MAP);
while (true) {
final Vertex vertex = vertices.next();
- if (null == vertex) return;
+ if (null == vertex) break;
workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
}
+ workerMapReduce.workerEnd(MapReduce.Stage.MAP);
});
- workers.mapReduceWorkerEnd(MapReduce.Stage.MAP);
-
// sort results if a map output sort is defined
mapEmitter.complete(mapReduce);
@@ -167,15 +166,15 @@ public final class TinkerGraphComputer implements GraphComputer {
if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
- workers.mapReduceWorkerStart(MapReduce.Stage.REDUCE);
workers.executeMapReduce(workerMapReduce -> {
+ workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
while (true) {
final Map.Entry<?, Queue<?>> entry = keyValues.next();
- if (null == entry) return;
+ if (null == entry) break;
workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
}
+ workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
});
- workers.mapReduceWorkerEnd(MapReduce.Stage.REDUCE);
reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1bfdb738/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 5935247..b47809f 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -56,16 +56,6 @@ public final class TinkerWorkerPool implements AutoCloseable {
this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
}
- ////
-
- public void vertexProgramWorkerIterationStart(final Memory memory) {
- this.vertexProgramPool.workerIterationStart(memory);
- }
-
- public void vertexProgramWorkerIterationEnd(final Memory memory) {
- this.vertexProgramPool.workerIterationEnd(memory);
- }
-
public void executeVertexProgram(final Consumer<VertexProgram> worker) {
try {
this.workerPool.submit(() -> {
@@ -78,17 +68,6 @@ public final class TinkerWorkerPool implements AutoCloseable {
}
}
- ///
-
- public void mapReduceWorkerStart(final MapReduce.Stage stage) {
- this.mapReducePool.workerStart(stage);
- }
-
- public void mapReduceWorkerEnd(final MapReduce.Stage stage) {
- this.mapReducePool.workerEnd(stage);
- }
-
-
public void executeMapReduce(final Consumer<MapReduce> worker) {
try {
this.workerPool.submit(() -> {