You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/25 23:52:02 UTC

incubator-tinkerpop git commit: assuming reduce needs new objects to be returned.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 8a94e3942 -> b521ae5b4


assuming reduce needs new objects to be returned.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b521ae5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b521ae5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b521ae5b

Branch: refs/heads/master
Commit: b521ae5b4f6340688f3a84db63b70aa208e3e881
Parents: 8a94e39
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 25 16:51:59 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 25 16:51:59 2015 -0600

----------------------------------------------------------------------
 .../computer/spark/SparkMessagePayload.java        |  2 +-
 .../process/computer/spark/SparkPayload.java       |  1 -
 .../process/computer/spark/util/SparkHelper.java   | 17 +++++++++++++----
 3 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b521ae5b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
index 43e34ab..4e58cf1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
@@ -29,7 +29,7 @@ public final class SparkMessagePayload<M> implements Serializable, SparkPayload<
 
     public final List<M> messages = new ArrayList<>();
 
-    private SparkMessagePayload() {
+    public SparkMessagePayload() {
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b521ae5b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
index 19c2b7c..11f98bb 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
@@ -48,5 +48,4 @@ public interface SparkPayload<M> {
     public default SparkVertexPayload<M> asVertexPayload() {
         return (SparkVertexPayload<M>) this;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b521ae5b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index b5c3a68..ed54d9a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -86,11 +86,20 @@ public final class SparkHelper {
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
         current = current.reduceByKey((payloadA, payloadB) -> {
             if (payloadA.isVertex()) {
-                payloadA.addMessages(payloadB.getMessages(), messageCombiner);
-                return payloadA;
+                final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadA.asVertexPayload().getVertex());
+                vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
+                vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
+                return vertexPayload;
+            } else if (payloadB.isVertex()) {
+                final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadB.asVertexPayload().getVertex());
+                vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
+                vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
+                return vertexPayload;
             } else {
-                payloadB.addMessages(payloadA.getMessages(), messageCombiner);
-                return payloadB;
+                final SparkMessagePayload<M> messagePayload = new SparkMessagePayload<>();
+                messagePayload.addMessages(payloadA.getMessages(), messageCombiner);
+                messagePayload.addMessages(payloadB.getMessages(), messageCombiner);
+                return messagePayload;
             }
         });