You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/26 00:29:28 UTC
incubator-tinkerpop git commit: not using 'on the fly' iterators but
a legit iterable for Spark message passing.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master b521ae5b4 -> 74123046b
not using 'on the fly' iterators but a legit iterable for Spark message passing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/74123046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/74123046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/74123046
Branch: refs/heads/master
Commit: 74123046b5ab618f6a4e4e8d2707676560bd8457
Parents: b521ae5
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 25 17:29:24 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 25 17:29:24 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/util/SparkHelper.java | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74123046/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index ed54d9a..f928172 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -41,7 +41,6 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;
import java.io.IOException;
@@ -74,12 +73,11 @@ public final class SparkHelper {
});
// emit messages by appending them to the graph as message payloads
- current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> () -> {
- keyValue._2().asVertexPayload().getMessages().clear(); // the graph vertex should not have any incoming messages (should be cleared from the previous stage)
- return IteratorUtils.concat(
- IteratorUtils.of(keyValue), // this is a vertex
- IteratorUtils.map(keyValue._2().asVertexPayload().getOutgoingMessages().iterator(),
- entry -> new Tuple2<>(entry._1(), new SparkMessagePayload<M>(entry._2())))); // this is a message;
+ current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> {
+ final List<Tuple2<Object, SparkPayload<M>>> list = new ArrayList<>();
+ list.add(keyValue); // this is a vertex
+ keyValue._2().asVertexPayload().getOutgoingMessages().forEach(message -> list.add(new Tuple2<>(message._1(), new SparkMessagePayload<>((M) message._2())))); // this is a message
+ return list;
});
// "message pass" by merging the message payloads with the vertex payloads
@@ -105,6 +103,7 @@ public final class SparkHelper {
// clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
current = current.mapValues(vertexPayload -> {
+ // vertexPayload.asVertexPayload().getMessages().clear();
vertexPayload.asVertexPayload().getOutgoingMessages().clear();
return vertexPayload;
});