You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/05 00:52:12 UTC

incubator-tinkerpop git commit: more minor realizations about Spark.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 1126869e4 -> 57ae2b681


more minor realizations about Spark.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/57ae2b68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/57ae2b68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/57ae2b68

Branch: refs/heads/master
Commit: 57ae2b681b8090a85a951ea0e1ec77ad0fd743f3
Parents: 1126869
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 4 16:52:09 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 4 16:52:09 2015 -0700

----------------------------------------------------------------------
 .../computer/spark/util/SparkHelper.java        | 21 ++++++++++----------
 1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/57ae2b68/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index 095207a..c7c7dc6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -67,17 +67,16 @@ public final class SparkHelper {
             });
         });
 
-        // emit messages by appending them to the graph vertices as message "vertices"
+        // emit messages by appending them to the graph as message payloads
         current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> () -> {
             keyValue._2().asVertexPayload().getMessages().clear(); // the graph vertex should not have any incoming messages (should be cleared from the previous stage)
-            return IteratorUtils.<Tuple2<Object, SparkPayload<M>>>concat(
-                    IteratorUtils.of(keyValue),
-                    IteratorUtils.map(keyValue._2().asVertexPayload().getOutgoingMessages().iterator(),            // this is a vertex
-                            entry -> new Tuple2<>(entry._1(), new SparkMessagePayload<>(entry._2()))));            // this is a message;
+            return IteratorUtils.concat(
+                    IteratorUtils.of(keyValue),                                                                    // this is a vertex
+                    IteratorUtils.map(keyValue._2().asVertexPayload().getOutgoingMessages().iterator(),
+                            entry -> new Tuple2<>(entry._1(), new SparkMessagePayload<M>(entry._2()))));           // this is a message;
         });
 
-        // "message pass" via reduction joining the "message vertices" with the graph vertices
-        // addMessages is provided the vertex program message combiner for partition and global level combining
+        // "message pass" by merging the message payloads with the vertex payloads
         current = current.reduceByKey(new Function2<SparkPayload<M>, SparkPayload<M>, SparkPayload<M>>() {
             private Optional<MessageCombiner<M>> messageCombinerOptional = null; // a hack to simulate partition(Spark)/worker(TP3) local variables
 
@@ -96,10 +95,10 @@ public final class SparkHelper {
             }
         });
 
-        // clear all previous outgoing messages (why can't we do this prior to the shuffle?)
-        current = current.mapValues(messenger -> {
-            messenger.asVertexPayload().getOutgoingMessages().clear();
-            return messenger;
+        // clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
+        current = current.mapValues(vertexPayload -> {
+            vertexPayload.asVertexPayload().getOutgoingMessages().clear();
+            return vertexPayload;
         });
 
         return current;