You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/26 23:28:28 UTC

incubator-tinkerpop git commit: a reduceByKey as the initial operation of the graphRDD ensures no duplicate vertices.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master b4fdfd779 -> ba79c0ef4


a reduceByKey as the initial operation of the graphRDD ensures no duplicate vertices.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ba79c0ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ba79c0ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ba79c0ef

Branch: refs/heads/master
Commit: ba79c0ef480b3b6b1eb1c518881e19b1ba1d656a
Parents: b4fdfd7
Author: okram <ok...@apache.org>
Authored: Thu Mar 26 16:28:23 2015 -0600
Committer: okram <ok...@apache.org>
Committed: Thu Mar 26 16:28:23 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/SparkExecutor.java   | 29 +++++++-------------
 1 file changed, 10 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ba79c0ef/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 58f4ad1..f3bdcfd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -49,12 +49,8 @@ public final class SparkExecutor {
     private SparkExecutor() {
     }
 
-    private static final void doNothing() {
-        // a cheap action
-    }
-
     public static <M> JavaPairRDD<Object, SparkPayload<M>> executeStep(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
-        JavaPairRDD<Object, SparkPayload<M>> current = graphRDD;
+        JavaPairRDD<Object, SparkPayload<M>> current = graphRDD.reduceByKey((payloadA, payloadB) -> payloadA); // TODO: total hack cause of something weird with recursive RDDs
         // execute vertex program
         current = current.mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
             final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
@@ -71,24 +67,19 @@ public final class SparkExecutor {
         });
 
         // emit messages by appending them to the graph as message payloads
-        current = current.<Object, SparkPayload<M>>flatMapToPair(vertex -> {
-            vertex._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
-            return () -> IteratorUtils.concat(
-                    IteratorUtils.of(vertex),
-                    IteratorUtils.map(vertex._2().asVertexPayload().detachOutgoingMessages(),
-                            message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2()))));
-
-        });
+        current = current.<Object, SparkPayload<M>>flatMapToPair(vertex -> () ->
+                IteratorUtils.concat(
+                        IteratorUtils.of(vertex),
+                        IteratorUtils.map(vertex._2().asVertexPayload().detachOutgoingMessages(),
+                                message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2())))));
 
         // "message pass" by merging the message payloads with the vertex payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
         current = current.reduceByKey((payloadA, payloadB) -> {
             if (payloadA.isVertex()) {
-                if (payloadB.isVertex()) {  // TODO: total hack cause of something weird with recursive RDDs
-                    final int sizeA = payloadA.getMessages().size();
-                    final int sizeB = payloadB.getMessages().size();
-                    return sizeA >= sizeB ? payloadA : payloadB;
-                } else {
+                if (payloadB.isVertex())
+                    throw new IllegalStateException("It should not be the case that two vertices reduce to the same key: " + payloadA.asVertexPayload().getVertex() + "==" + payloadB.asVertexPayload().getVertex());
+                else {
                     payloadA.addMessages(payloadB.getMessages(), messageCombiner);
                     return payloadA;
                 }
@@ -97,7 +88,7 @@ public final class SparkExecutor {
                 return payloadB;
             }
         });
-        current.foreach(vertex -> doNothing()); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
+        current.foreach(vertex -> {}); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
         return current;
     }