You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/26 00:29:28 UTC

incubator-tinkerpop git commit: not using 'on the fly' iterators but a legit iterable for Spark message passing.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master b521ae5b4 -> 74123046b


not using 'on the fly' iterators but a legit iterable for Spark message passing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/74123046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/74123046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/74123046

Branch: refs/heads/master
Commit: 74123046b5ab618f6a4e4e8d2707676560bd8457
Parents: b521ae5
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 25 17:29:24 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 25 17:29:24 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/util/SparkHelper.java       | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74123046/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index ed54d9a..f928172 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -41,7 +41,6 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.Tuple2;
 
 import java.io.IOException;
@@ -74,12 +73,11 @@ public final class SparkHelper {
         });
 
         // emit messages by appending them to the graph as message payloads
-        current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> () -> {
-            keyValue._2().asVertexPayload().getMessages().clear(); // the graph vertex should not have any incoming messages (should be cleared from the previous stage)
-            return IteratorUtils.concat(
-                    IteratorUtils.of(keyValue),                                                                    // this is a vertex
-                    IteratorUtils.map(keyValue._2().asVertexPayload().getOutgoingMessages().iterator(),
-                            entry -> new Tuple2<>(entry._1(), new SparkMessagePayload<M>(entry._2()))));           // this is a message;
+        current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> {
+            final List<Tuple2<Object, SparkPayload<M>>> list = new ArrayList<>();
+            list.add(keyValue);    // this is a vertex
+            keyValue._2().asVertexPayload().getOutgoingMessages().forEach(message -> list.add(new Tuple2<>(message._1(), new SparkMessagePayload<>((M) message._2())))); // this is a message
+            return list;
         });
 
         // "message pass" by merging the message payloads with the vertex payloads
@@ -105,6 +103,7 @@ public final class SparkHelper {
 
         // clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
         current = current.mapValues(vertexPayload -> {
+            // vertexPayload.asVertexPayload().getMessages().clear();
             vertexPayload.asVertexPayload().getOutgoingMessages().clear();
             return vertexPayload;
         });