You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/26 22:46:22 UTC

incubator-tinkerpop git commit: being smart about payload type. still the inane problem on large recurssive RDDs.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 774a87e04 -> b4fdfd779


being smart about payload type. still the inane problem on large recurssive RDDs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b4fdfd77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b4fdfd77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b4fdfd77

Branch: refs/heads/master
Commit: b4fdfd779aced3366da5f5834513d71cdd639d94
Parents: 774a87e
Author: okram <ok...@apache.org>
Authored: Thu Mar 26 15:46:18 2015 -0600
Committer: okram <ok...@apache.org>
Committed: Thu Mar 26 15:46:18 2015 -0600

----------------------------------------------------------------------
 .../hadoop/process/computer/spark/SparkExecutor.java    | 12 +++++++++---
 .../process/computer/spark/SparkGraphComputer.java      |  2 +-
 .../hadoop/process/computer/spark/SparkPayload.java     | 11 +++--------
 3 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4fdfd77/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 44ac16b..58f4ad1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -75,7 +75,7 @@ public final class SparkExecutor {
             vertex._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
             return () -> IteratorUtils.concat(
                     IteratorUtils.of(vertex),
-                    IteratorUtils.map(vertex._2().asVertexPayload().getOutgoingMessages().iterator(),
+                    IteratorUtils.map(vertex._2().asVertexPayload().detachOutgoingMessages(),
                             message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2()))));
 
         });
@@ -84,8 +84,14 @@ public final class SparkExecutor {
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
         current = current.reduceByKey((payloadA, payloadB) -> {
             if (payloadA.isVertex()) {
-                payloadA.addMessages(payloadB.getMessages(), messageCombiner);
-                return payloadA;
+                if (payloadB.isVertex()) {  // TODO: total hack cause of something weird with recursive RDDs
+                    final int sizeA = payloadA.getMessages().size();
+                    final int sizeB = payloadB.getMessages().size();
+                    return sizeA >= sizeB ? payloadA : payloadB;
+                } else {
+                    payloadA.addMessages(payloadB.getMessages(), messageCombiner);
+                    return payloadA;
+                }
             } else {
                 payloadB.addMessages(payloadA.getMessages(), messageCombiner);
                 return payloadB;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4fdfd77/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 733f2a2..fc83bd6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -208,7 +208,7 @@ public final class SparkGraphComputer implements GraphComputer {
                                 vertex.asVertexPayload().getOutgoingMessages().clear();
                                 vertex.asVertexPayload().getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
                                 return vertex;
-                            });
+                            });   // todo: cache()?
                             for (final MapReduce mapReduce : this.mapReducers) {
                                 // execute the map reduce job
                                 final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4fdfd77/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
index ebb4b18..3ec1d5b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 
 import java.util.List;
+import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -29,15 +30,9 @@ public interface SparkPayload<M> {
 
     public default void addMessages(final List<M> otherMessages, final MessageCombiner<M> messageCombiner) {
         if (null != messageCombiner) {
-            M message = null;
-            for (final M m : this.getMessages()) {
-                message = null == message ? m : messageCombiner.combine(message, m);
-            }
-            for (final M m : otherMessages) {
-                message = null == message ? m : messageCombiner.combine(message, m);
-            }
+            final M message = Stream.concat(this.getMessages().stream(),otherMessages.stream()).reduce(messageCombiner::combine).get();
             this.getMessages().clear();
-            if (null != message) this.getMessages().add(message);
+            this.getMessages().add(message);
         } else {
             this.getMessages().addAll(otherMessages);
         }