You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 21:35:59 UTC

incubator-tinkerpop git commit: lots of optimization goodies here. learning more and more about Spark as I go and getting clever about it. My powers are increasing. Soon I will crush you all with my army of machines.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 439fbb918 -> 4fb660488


lots of optimization goodies here. learning more and more about Spark as I go and getting clever about it. My powers are increasing. Soon I will crush you all with my army of machines.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4fb66048
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4fb66048
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4fb66048

Branch: refs/heads/master
Commit: 4fb660488f7f008fa8d7baf1548e0354f9342a93
Parents: 439fbb9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 4 13:35:56 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 4 13:35:56 2015 -0700

----------------------------------------------------------------------
 .../process/computer/spark/SparkMessenger.java  | 32 +++++++++++---
 .../computer/spark/util/SparkHelper.java        | 44 +++++++++-----------
 2 files changed, 46 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4fb66048/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index 812bdd3..4fea02a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
 import org.apache.tinkerpop.gremlin.process.Traversal;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
 import org.apache.tinkerpop.gremlin.process.graph.traversal.step.map.VertexStep;
@@ -27,13 +28,17 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -42,7 +47,7 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
 
     private Vertex vertex;
     private List<M> incoming;
-    private Map<Object, List<M>> outgoing = new HashMap<>();
+    private Map<Object, List<M>> outgoing;
 
     public SparkMessenger() {
 
@@ -51,6 +56,7 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
     public SparkMessenger(final Vertex vertex) {
         this.vertex = vertex;
         this.incoming = new ArrayList<>();
+        this.outgoing = this.vertex instanceof DetachedVertex ? null : new HashMap<>();
     }
 
     public SparkMessenger(final Vertex vertex, final List<M> incomingMessages) {
@@ -70,12 +76,20 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
         return this.vertex;
     }
 
-    public void setVertex(final Vertex vertex) {
-        this.vertex = vertex;
+    public void mergeInMessenger(final SparkMessenger<M> otherMessenger) {
+        this.vertex = otherMessenger.vertex;
+        this.outgoing = otherMessenger.outgoing;
     }
 
-    public void addIncomingMessages(final SparkMessenger<M> otherMessenger) {
-        this.incoming.addAll(otherMessenger.incoming);
+    public void addIncomingMessages(final SparkMessenger<M> otherMessenger, final Optional<MessageCombiner<M>> messageCombinerOptional) {
+        if (messageCombinerOptional.isPresent()) {
+            final MessageCombiner<M> messageCombiner = messageCombinerOptional.get();
+            final M combinedMessage = Stream.concat(this.incoming.stream(), otherMessenger.incoming.stream()).reduce(messageCombiner::combine).get();
+            this.incoming.clear();
+            this.incoming.add(combinedMessage);
+        } else {
+            this.incoming.addAll(otherMessenger.incoming);
+        }
     }
 
     public Set<Map.Entry<Object, List<M>>> getOutgoingMessages() {
@@ -84,11 +98,17 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
 
     @Override
     public Iterable<M> receiveMessages(final MessageScope messageScope) {
-        return this.incoming;
+        if (null == this.outgoing)
+            throw new IllegalStateException("Message vertices can not receive messages");
+
+        return null == this.incoming ? Collections.emptyList() : this.incoming;
     }
 
     @Override
     public void sendMessage(final MessageScope messageScope, final M message) {
+        if (null == this.outgoing)
+            throw new IllegalStateException("Message vertices can not send messages");
+
         if (messageScope instanceof MessageScope.Local) {
             final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
             final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4fb66048/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index ff23d36..a974f63 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
@@ -47,6 +48,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -69,13 +71,6 @@ public final class SparkHelper {
             });
         });
 
-        // clear all previous incoming messages
-        if (!memory.isInitialIteration()) {  // a quick savings as the initial iteration has no messages yet
-            current = current.mapValues(messenger -> {
-                messenger.clearIncomingMessages();
-                return messenger;
-            });
-        }
         // emit messages by appending them to the graph vertices data as "message vertices"
         current = current.<Object, SparkMessenger<M>>flatMapToPair(keyValue -> () -> new Iterator<Tuple2<Object, SparkMessenger<M>>>() {
             boolean first = true;
@@ -90,6 +85,7 @@ public final class SparkHelper {
             public Tuple2<Object, SparkMessenger<M>> next() {
                 if (this.first) {
                     this.first = false;
+                    keyValue._2().clearIncomingMessages(); // the raw vertex should not have any incoming messages (should be cleared from the previous stage)
                     return new Tuple2<Object, SparkMessenger<M>>(keyValue._1(), keyValue._2()); // this is the raw vertex data
                 } else {
                     final Map.Entry<Object, List<M>> entry = this.iterator.next();
@@ -98,19 +94,19 @@ public final class SparkHelper {
             }
         });
 
-        // TODO: local message combiner
-        if (globalVertexProgram.getMessageCombiner().isPresent()) {
-           /* current = current.combineByKey(messenger -> {
-                return messenger;
-            });*/
-        }
-
         // "message pass" via reduction joining the "message vertices" with the graph vertices
-        current = current.reduceByKey((a, b) -> {
-            if (a.getVertex() instanceof DetachedVertex && !(b.getVertex() instanceof DetachedVertex))
-                a.setVertex(b.getVertex());
-            a.addIncomingMessages(b);
-            return a;  // always reduce on the first argument
+        // addIncomingMessages is provided the vertex program message combiner for partition and global level combining
+        final boolean hasMessageCombiner = globalVertexProgram.getMessageCombiner().isPresent();
+        current = current.reduceByKey((messengerA, messengerB) -> {
+            final Optional<MessageCombiner<M>> messageCombinerOptional = hasMessageCombiner ?
+                    VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner() :  // this is expensive but there is no "reduceByKeyPartition" :(
+                    Optional.empty();
+
+            if (messengerA.getVertex() instanceof DetachedVertex && !(messengerB.getVertex() instanceof DetachedVertex)) // fold the message vertices into the graph vertices
+                messengerA.mergeInMessenger(messengerB);
+            messengerA.addIncomingMessages(messengerB, messageCombinerOptional);
+
+            return messengerA;  // always reduce on the first argument
         });
 
         // clear all previous outgoing messages
@@ -122,10 +118,10 @@ public final class SparkHelper {
     }
 
     public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD, final MapReduce<K, V, ?, ?, ?> globalMapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<K, V> mapRDD = hadoopGraphRDD.mapPartitionsToPair(iterator -> {
+        JavaPairRDD<K, V> mapRDD = hadoopGraphRDD.mapPartitionsToPair(partitionIterator -> {
             final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
             final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
-            iterator.forEachRemaining(tuple -> workerMapReduce.map(tuple._2().get(), mapEmitter));
+            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().get(), mapEmitter));
             return mapEmitter.getEmissions();
         });
         if (globalMapReduce.getMapKeySort().isPresent())
@@ -136,10 +132,10 @@ public final class SparkHelper {
     // TODO: public static executeCombine()
 
     public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> globalMapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(iterator -> {
+        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
             final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
             final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
-            iterator.forEachRemaining(tuple -> workerMapReduce.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter));
+            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
             return reduceEmitter.getEmissions();
         });
         if (globalMapReduce.getReduceKeySort().isPresent())
@@ -174,7 +170,7 @@ public final class SparkHelper {
         final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
         if (null != outputLocation) {
             // map back to a Hadoop stream for output
-            mapReduceRDD.mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+            mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
                     ObjectWritable.class,
                     ObjectWritable.class,
                     (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));