You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/25 23:07:02 UTC

incubator-tinkerpop git commit: MessageCombiner extends Serializable. Makes sense to do so and makes Spark reducing less error prone.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master ee0af473e -> 57214bcc6


MessageCombiner extends Serializable. Makes sense to do so and makes Spark reducing less error prone.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/57214bcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/57214bcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/57214bcc

Branch: refs/heads/master
Commit: 57214bcc6ace2eb52f29382fab7bc2c06b421928
Parents: ee0af47
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 25 16:06:57 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 25 16:06:57 2015 -0600

----------------------------------------------------------------------
 .../process/computer/MessageCombiner.java       |  4 +++-
 .../process/computer/spark/SparkPayload.java    |  6 ++---
 .../computer/spark/util/SparkHelper.java        | 25 +++++++-------------
 3 files changed, 13 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/57214bcc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
index b9d96c9..4840a40 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
@@ -18,6 +18,8 @@
  */
 package org.apache.tinkerpop.gremlin.process.computer;
 
+import java.io.Serializable;
+
 /**
  * A MessageCombiner allows two messages in route to the same vertex to be aggregated into a single message.
  * Message combining can reduce the number of messages sent between vertices and thus, reduce network traffic.
@@ -25,7 +27,7 @@ package org.apache.tinkerpop.gremlin.process.computer;
  *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface MessageCombiner<M> {
+public interface MessageCombiner<M> extends Serializable {
 
     /**
      * Combine two messages and return a message containing the combination.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/57214bcc/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
index 69980bb..6a6ef92 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 
 import java.util.List;
-import java.util.Optional;
 import java.util.stream.Stream;
 
 /**
@@ -29,9 +28,8 @@ import java.util.stream.Stream;
  */
 public interface SparkPayload<M> {
 
-    public default void addMessages(final List<M> otherMessages, final Optional<MessageCombiner<M>> messageCombinerOptional) {
-        if (messageCombinerOptional.isPresent()) {
-            final MessageCombiner<M> messageCombiner = messageCombinerOptional.get();
+    public default void addMessages(final List<M> otherMessages, final MessageCombiner<M> messageCombiner) {
+        if (null != messageCombiner) {
             final M combinedMessage = Stream.concat(this.getMessages().stream(), otherMessages.stream()).reduce(messageCombiner::combine).get();
             this.getMessages().clear();
             this.getMessages().add(combinedMessage);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/57214bcc/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index 976a457..b5c3a68 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
@@ -48,7 +47,6 @@ import scala.Tuple2;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -85,21 +83,14 @@ public final class SparkHelper {
         });
 
         // "message pass" by merging the message payloads with the vertex payloads
-        current = current.reduceByKey(new Function2<SparkPayload<M>, SparkPayload<M>, SparkPayload<M>>() {
-            private Optional<MessageCombiner<M>> messageCombinerOptional = null; // a hack to simulate partition(Spark)/worker(TP3) local variables
-
-            @Override
-            public SparkPayload<M> call(final SparkPayload<M> payloadA, final SparkPayload<M> payloadB) throws Exception {
-                if (null == this.messageCombinerOptional)
-                    this.messageCombinerOptional = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner();
-
-                if (payloadA.isVertex()) {
-                    payloadA.addMessages(payloadB.getMessages(), this.messageCombinerOptional);
-                    return payloadA;
-                } else {
-                    payloadB.addMessages(payloadA.getMessages(), this.messageCombinerOptional);
-                    return payloadB;
-                }
+        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
+        current = current.reduceByKey((payloadA, payloadB) -> {
+            if (payloadA.isVertex()) {
+                payloadA.addMessages(payloadB.getMessages(), messageCombiner);
+                return payloadA;
+            } else {
+                payloadB.addMessages(payloadA.getMessages(), messageCombiner);
+                return payloadB;
             }
         });