You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/05 00:52:12 UTC
incubator-tinkerpop git commit: more minor realizations about Spark.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 1126869e4 -> 57ae2b681
more minor realizations about Spark.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/57ae2b68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/57ae2b68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/57ae2b68
Branch: refs/heads/master
Commit: 57ae2b681b8090a85a951ea0e1ec77ad0fd743f3
Parents: 1126869
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 4 16:52:09 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 4 16:52:09 2015 -0700
----------------------------------------------------------------------
.../computer/spark/util/SparkHelper.java | 21 ++++++++++----------
1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/57ae2b68/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index 095207a..c7c7dc6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -67,17 +67,16 @@ public final class SparkHelper {
});
});
- // emit messages by appending them to the graph vertices as message "vertices"
+ // emit messages by appending them to the graph as message payloads
current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> () -> {
keyValue._2().asVertexPayload().getMessages().clear(); // the graph vertex should not have any incoming messages (should be cleared from the previous stage)
- return IteratorUtils.<Tuple2<Object, SparkPayload<M>>>concat(
- IteratorUtils.of(keyValue),
- IteratorUtils.map(keyValue._2().asVertexPayload().getOutgoingMessages().iterator(), // this is a vertex
- entry -> new Tuple2<>(entry._1(), new SparkMessagePayload<>(entry._2())))); // this is a message;
+ return IteratorUtils.concat(
+ IteratorUtils.of(keyValue), // this is a vertex
+ IteratorUtils.map(keyValue._2().asVertexPayload().getOutgoingMessages().iterator(),
+ entry -> new Tuple2<>(entry._1(), new SparkMessagePayload<M>(entry._2())))); // this is a message;
});
- // "message pass" via reduction joining the "message vertices" with the graph vertices
- // addMessages is provided the vertex program message combiner for partition and global level combining
+ // "message pass" by merging the message payloads with the vertex payloads
current = current.reduceByKey(new Function2<SparkPayload<M>, SparkPayload<M>, SparkPayload<M>>() {
private Optional<MessageCombiner<M>> messageCombinerOptional = null; // a hack to simulate partition(Spark)/worker(TP3) local variables
@@ -96,10 +95,10 @@ public final class SparkHelper {
}
});
- // clear all previous outgoing messages (why can't we do this prior to the shuffle?)
- current = current.mapValues(messenger -> {
- messenger.asVertexPayload().getOutgoingMessages().clear();
- return messenger;
+ // clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
+ current = current.mapValues(vertexPayload -> {
+ vertexPayload.asVertexPayload().getOutgoingMessages().clear();
+ return vertexPayload;
});
return current;