You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/25 23:52:02 UTC
incubator-tinkerpop git commit: assuming reduce needs new objects to
be returned.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 8a94e3942 -> b521ae5b4
assuming reduce needs new objects to be returned.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b521ae5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b521ae5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b521ae5b
Branch: refs/heads/master
Commit: b521ae5b4f6340688f3a84db63b70aa208e3e881
Parents: 8a94e39
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 25 16:51:59 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 25 16:51:59 2015 -0600
----------------------------------------------------------------------
.../computer/spark/SparkMessagePayload.java | 2 +-
.../process/computer/spark/SparkPayload.java | 1 -
.../process/computer/spark/util/SparkHelper.java | 17 +++++++++++++----
3 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b521ae5b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
index 43e34ab..4e58cf1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
@@ -29,7 +29,7 @@ public final class SparkMessagePayload<M> implements Serializable, SparkPayload<
public final List<M> messages = new ArrayList<>();
- private SparkMessagePayload() {
+ public SparkMessagePayload() {
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b521ae5b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
index 19c2b7c..11f98bb 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
@@ -48,5 +48,4 @@ public interface SparkPayload<M> {
public default SparkVertexPayload<M> asVertexPayload() {
return (SparkVertexPayload<M>) this;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b521ae5b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index b5c3a68..ed54d9a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -86,11 +86,20 @@ public final class SparkHelper {
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
current = current.reduceByKey((payloadA, payloadB) -> {
if (payloadA.isVertex()) {
- payloadA.addMessages(payloadB.getMessages(), messageCombiner);
- return payloadA;
+ final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadA.asVertexPayload().getVertex());
+ vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
+ vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
+ return vertexPayload;
+ } else if (payloadB.isVertex()) {
+ final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadB.asVertexPayload().getVertex());
+ vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
+ vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
+ return vertexPayload;
} else {
- payloadB.addMessages(payloadA.getMessages(), messageCombiner);
- return payloadB;
+ final SparkMessagePayload<M> messagePayload = new SparkMessagePayload<>();
+ messagePayload.addMessages(payloadA.getMessages(), messageCombiner);
+ messagePayload.addMessages(payloadB.getMessages(), messageCombiner);
+ return messagePayload;
}
});