You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 22:12:25 UTC

incubator-tinkerpop git commit: SparkGraphComputer is becoming a well oiled machine... so many heres and theres that are making it more and more memory efficient and lazy.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 4fb660488 -> fca86b9ef


SparkGraphComputer is becoming a well oiled machine... so many heres and theres that are making it more and more memory efficient and lazy.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fca86b9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fca86b9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fca86b9e

Branch: refs/heads/master
Commit: fca86b9efdda5e5c8101b68ba7f873ef654471e4
Parents: 4fb6604
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 4 14:12:21 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 4 14:12:21 2015 -0700

----------------------------------------------------------------------
 .../computer/spark/SparkGraphComputer.java      | 24 +++-----
 .../process/computer/spark/SparkMessenger.java  | 61 +++++++-------------
 .../computer/spark/util/SparkHelper.java        | 38 ++++--------
 3 files changed, 40 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fca86b9e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 560acc0..571f222 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -130,16 +130,17 @@ public final class SparkGraphComputer implements GraphComputer {
                         hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
                         if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
                             hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
-                        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
-                        SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
-                        ///
-                        try {
+
+                        // execute the vertex program
+                        try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+                            // add the project jars to the cluster
+                            SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
                             // create a message-passing friendly rdd from the hadoop input format
                             JavaPairRDD<Object, SparkMessenger<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
                                     (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                     NullWritable.class,
                                     VertexWritable.class)
-                                    .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
+                                    .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), SparkMessenger.forGraphVertex(new SparkVertex((TinkerVertex) tuple._2().get()))));
 
                             // set up the vertex program and wire up configurations
                             memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
@@ -164,11 +165,7 @@ public final class SparkGraphComputer implements GraphComputer {
 
                             // write the output graph back to disk
                             SparkHelper.saveVertexProgramRDD(graphRDD, hadoopConfiguration);
-                        } finally {
-                            // must close the context or bad things happen
-                            sparkContext.close();
                         }
-                        sparkContext.close(); // why not try again todo
                     }
 
                     //////////////////////////////
@@ -185,10 +182,9 @@ public final class SparkGraphComputer implements GraphComputer {
                                     hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION) : // if no vertex program grab the graph from the input location
                                     hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
 
-                        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
-                        SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
                         // execute the map reduce job
-                        try {
+                        try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+                            SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
                             final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
                                     (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                     NullWritable.class,
@@ -204,11 +200,7 @@ public final class SparkGraphComputer implements GraphComputer {
 
                             // write the map reduce output back to disk (memory)
                             SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
-                        } finally {
-                            // must close the context or bad things happen
-                            sparkContext.close();
                         }
-                        sparkContext.close(); // why not try again todo
                     }
 
                     // update runtime and return the newly computed graph

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fca86b9e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index 4fea02a..ea6187a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -29,15 +29,13 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
+import scala.Tuple2;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Stream;
 
 /**
@@ -47,21 +45,26 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
 
     private Vertex vertex;
     private List<M> incoming;
-    private Map<Object, List<M>> outgoing;
+    private List<Tuple2<Object, M>> outgoing;
 
-    public SparkMessenger() {
+    private SparkMessenger() {
 
     }
 
-    public SparkMessenger(final Vertex vertex) {
-        this.vertex = vertex;
-        this.incoming = new ArrayList<>();
-        this.outgoing = this.vertex instanceof DetachedVertex ? null : new HashMap<>();
+    public static final <M> SparkMessenger<M> forGraphVertex(final Vertex graphVertex) {
+        final SparkMessenger<M> messenger = new SparkMessenger<>();
+        messenger.vertex = graphVertex;
+        messenger.incoming = new ArrayList<>();
+        messenger.outgoing = new ArrayList<>();
+        return messenger;
     }
 
-    public SparkMessenger(final Vertex vertex, final List<M> incomingMessages) {
-        this.vertex = vertex;
-        this.incoming = incomingMessages;
+    public static final <M> SparkMessenger<M> forMessageVertex(final Object vertexId, final M message) {
+        final SparkMessenger<M> messenger = new SparkMessenger<>();
+        messenger.vertex = new DetachedVertex(vertexId, Vertex.DEFAULT_LABEL, Collections.emptyMap());
+        messenger.incoming = new ArrayList<>();
+        messenger.incoming.add(message);
+        return messenger;
     }
 
     public void clearIncomingMessages() {
@@ -69,7 +72,8 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
     }
 
     public void clearOutgoingMessages() {
-        this.outgoing.clear();
+        if (null != this.outgoing)
+            this.outgoing.clear();
     }
 
     public Vertex getVertex() {
@@ -92,45 +96,24 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
         }
     }
 
-    public Set<Map.Entry<Object, List<M>>> getOutgoingMessages() {
-        return this.outgoing.entrySet();
+    public List<Tuple2<Object, M>> getOutgoingMessages() {
+        return this.outgoing;
     }
 
     @Override
     public Iterable<M> receiveMessages(final MessageScope messageScope) {
-        if (null == this.outgoing)
-            throw new IllegalStateException("Message vertices can not receive messages");
-
-        return null == this.incoming ? Collections.emptyList() : this.incoming;
+        return this.incoming;
     }
 
     @Override
     public void sendMessage(final MessageScope messageScope, final M message) {
-        if (null == this.outgoing)
-            throw new IllegalStateException("Message vertices can not send messages");
-
         if (messageScope instanceof MessageScope.Local) {
             final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
             final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
             final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
-            incidentTraversal.forEachRemaining(edge -> {
-                final Object otherVertexId = edge.iterators().vertexIterator(direction).next().id();
-                List<M> messages = this.outgoing.get(otherVertexId);
-                if (null == messages) {
-                    messages = new ArrayList<>();
-                    this.outgoing.put(otherVertexId, messages);
-                }
-                messages.add(message);
-            });
+            incidentTraversal.forEachRemaining(edge -> this.outgoing.add(new Tuple2<>(edge.iterators().vertexIterator(direction).next().id(), message)));
         } else {
-            ((MessageScope.Global) messageScope).vertices().forEach(v -> {
-                List<M> messages = this.outgoing.get(v.id());
-                if (null == messages) {
-                    messages = new ArrayList<>();
-                    this.outgoing.put(v.id(), messages);
-                }
-                messages.add(message);
-            });
+            ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoing.add(new Tuple2<>(v.id(), message)));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fca86b9e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index a974f63..0c10fbd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -38,16 +38,11 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.Tuple2;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -61,7 +56,7 @@ public final class SparkHelper {
     public static <M> JavaPairRDD<Object, SparkMessenger<M>> executeStep(final JavaPairRDD<Object, SparkMessenger<M>> graphRDD, final VertexProgram<M> globalVertexProgram, final SparkMemory memory, final Configuration apacheConfiguration) {
         JavaPairRDD<Object, SparkMessenger<M>> current = graphRDD;
         // execute vertex program
-        current = current.mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program
+        current = current.mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
             final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
             workerVertexProgram.workerIterationStart(memory);
             return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(partitionIterator, keyValue -> {
@@ -71,27 +66,13 @@ public final class SparkHelper {
             });
         });
 
-        // emit messages by appending them to the graph vertices data as "message vertices"
-        current = current.<Object, SparkMessenger<M>>flatMapToPair(keyValue -> () -> new Iterator<Tuple2<Object, SparkMessenger<M>>>() {
-            boolean first = true;
-            final Iterator<Map.Entry<Object, List<M>>> iterator = keyValue._2().getOutgoingMessages().iterator();
-
-            @Override
-            public boolean hasNext() {
-                return this.first || this.iterator.hasNext();
-            }
-
-            @Override
-            public Tuple2<Object, SparkMessenger<M>> next() {
-                if (this.first) {
-                    this.first = false;
-                    keyValue._2().clearIncomingMessages(); // the raw vertex should not have any incoming messages (should be cleared from the previous stage)
-                    return new Tuple2<Object, SparkMessenger<M>>(keyValue._1(), keyValue._2()); // this is the raw vertex data
-                } else {
-                    final Map.Entry<Object, List<M>> entry = this.iterator.next();
-                    return new Tuple2<Object, SparkMessenger<M>>(entry.getKey(), new SparkMessenger<>(new DetachedVertex(entry.getKey(), Vertex.DEFAULT_LABEL, Collections.emptyMap()), entry.getValue()));  // these are the messages
-                }
-            }
+        // emit messages by appending them to the graph vertices as message "vertices"
+        current = current.<Object, SparkMessenger<M>>flatMapToPair(keyValue -> () -> {
+            keyValue._2().clearIncomingMessages(); // the graph vertex should not have any incoming messages (should be cleared from the previous stage)
+            return IteratorUtils.<Tuple2<Object, SparkMessenger<M>>>concat(
+                    IteratorUtils.of(keyValue),
+                    IteratorUtils.map(keyValue._2().getOutgoingMessages().iterator(),                                      // this is the graph vertex
+                            entry -> new Tuple2<>(entry._1(), SparkMessenger.forMessageVertex(entry._2(), entry._2()))));  // this is a message "vertex";
         });
 
         // "message pass" via reduction joining the "message vertices" with the graph vertices
@@ -109,11 +90,12 @@ public final class SparkHelper {
             return messengerA;  // always reduce on the first argument
         });
 
-        // clear all previous outgoing messages
+        // clear all previous outgoing messages (why can't we do this prior to the shuffle?)
         current = current.mapValues(messenger -> {
             messenger.clearOutgoingMessages();
             return messenger;
         });
+
         return current;
     }