You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/25 23:07:02 UTC
incubator-tinkerpop git commit: MessageCombiner extends Serializable.
Makes sense to do so and makes Spark reducing less error prone.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master ee0af473e -> 57214bcc6
MessageCombiner extends Serializable. Makes sense to do so and makes Spark reducing less error prone.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/57214bcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/57214bcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/57214bcc
Branch: refs/heads/master
Commit: 57214bcc6ace2eb52f29382fab7bc2c06b421928
Parents: ee0af47
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 25 16:06:57 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 25 16:06:57 2015 -0600
----------------------------------------------------------------------
.../process/computer/MessageCombiner.java | 4 +++-
.../process/computer/spark/SparkPayload.java | 6 ++---
.../computer/spark/util/SparkHelper.java | 25 +++++++-------------
3 files changed, 13 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/57214bcc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
index b9d96c9..4840a40 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
@@ -18,6 +18,8 @@
*/
package org.apache.tinkerpop.gremlin.process.computer;
+import java.io.Serializable;
+
/**
* A MessageCombiner allows two messages in route to the same vertex to be aggregated into a single message.
* Message combining can reduce the number of messages sent between vertices and thus, reduce network traffic.
@@ -25,7 +27,7 @@ package org.apache.tinkerpop.gremlin.process.computer;
*
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public interface MessageCombiner<M> {
+public interface MessageCombiner<M> extends Serializable {
/**
* Combine two messages and return a message containing the combination.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/57214bcc/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
index 69980bb..6a6ef92 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import java.util.List;
-import java.util.Optional;
import java.util.stream.Stream;
/**
@@ -29,9 +28,8 @@ import java.util.stream.Stream;
*/
public interface SparkPayload<M> {
- public default void addMessages(final List<M> otherMessages, final Optional<MessageCombiner<M>> messageCombinerOptional) {
- if (messageCombinerOptional.isPresent()) {
- final MessageCombiner<M> messageCombiner = messageCombinerOptional.get();
+ public default void addMessages(final List<M> otherMessages, final MessageCombiner<M> messageCombiner) {
+ if (null != messageCombiner) {
final M combinedMessage = Stream.concat(this.getMessages().stream(), otherMessages.stream()).reduce(messageCombiner::combine).get();
this.getMessages().clear();
this.getMessages().add(combinedMessage);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/57214bcc/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index 976a457..b5c3a68 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
@@ -48,7 +47,6 @@ import scala.Tuple2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
/**
@@ -85,21 +83,14 @@ public final class SparkHelper {
});
// "message pass" by merging the message payloads with the vertex payloads
- current = current.reduceByKey(new Function2<SparkPayload<M>, SparkPayload<M>, SparkPayload<M>>() {
- private Optional<MessageCombiner<M>> messageCombinerOptional = null; // a hack to simulate partition(Spark)/worker(TP3) local variables
-
- @Override
- public SparkPayload<M> call(final SparkPayload<M> payloadA, final SparkPayload<M> payloadB) throws Exception {
- if (null == this.messageCombinerOptional)
- this.messageCombinerOptional = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner();
-
- if (payloadA.isVertex()) {
- payloadA.addMessages(payloadB.getMessages(), this.messageCombinerOptional);
- return payloadA;
- } else {
- payloadB.addMessages(payloadA.getMessages(), this.messageCombinerOptional);
- return payloadB;
- }
+ final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
+ current = current.reduceByKey((payloadA, payloadB) -> {
+ if (payloadA.isVertex()) {
+ payloadA.addMessages(payloadB.getMessages(), messageCombiner);
+ return payloadA;
+ } else {
+ payloadB.addMessages(payloadA.getMessages(), messageCombiner);
+ return payloadB;
}
});