You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/29 17:45:22 UTC

incubator-tinkerpop git commit: after a week of benchmarking, finalized the optimizations of SparkGrpahComputer. will be sending an email around discussing the findings.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master a6c9f60bc -> 09a5d288c


after a week of benchmarking, finalized the optimizations of SparkGrpahComputer. will be sending an email around discussing the findings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/09a5d288
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/09a5d288
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/09a5d288

Branch: refs/heads/master
Commit: 09a5d288c4143f2853386ce908c82d9ced3c30e7
Parents: a6c9f60
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Jan 29 09:45:00 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 29 09:45:18 2016 -0700

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   | 24 ++++++++++++--------
 .../process/computer/SparkGraphComputer.java    | 20 ++++++++++++----
 2 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/09a5d288/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index f1734c8..e0dccab 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -64,10 +64,12 @@ public final class SparkExecutor {
             final SparkMemory memory,
             final Configuration apacheConfiguration) {
 
+        if (null != viewIncomingRDD) // the graphRDD and the viewRDD must have the same partitioner
+            assert graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get());
         final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
                 graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
                 graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages
-                // for each partition of vertices
+                // for each partition of vertices emit a view and their outgoing messages
                 .mapPartitionsToPair(partitionIterator -> {
                     HadoopPools.initialize(apacheConfiguration);
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
@@ -77,18 +79,18 @@ public final class SparkExecutor {
                     workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
                     return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
                         final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
-                        // drop any compute properties that are cached in memory
+                        // drop any computed properties that are cached in memory
                         if (elementComputeKeysArray.length > 0)
                             vertex.dropVertexProperties(elementComputeKeysArray);
                         final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
-                        previousView.clear(); // no longer needed so kill it from memory
+                        // previousView.clear(); // no longer needed so kill it from memory
                         ///
                         messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
-                        incomingMessages.clear(); // no longer needed so kill it from memory
+                        // incomingMessages.clear(); // no longer needed so kill it from memory
                         ///
                         final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
@@ -98,15 +100,16 @@ public final class SparkExecutor {
                             workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
                         return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
                     });
-                }));
-
+                }, true)); // true means that the partition is preserved
+        // the graphRDD and the viewRDD must have the same partitioner
+        assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
         // "message pass" by reducing on the vertex object id of the view and message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
         final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
                 .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
                         IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
                         IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))  // emit the outgoing message payloads one by one
-                .reduceByKey((a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
+                .reduceByKey(graphRDD.partitioner().get(), (a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
                     if (a instanceof ViewIncomingPayload) {
                         ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
                         return a;
@@ -125,7 +128,8 @@ public final class SparkExecutor {
                 .mapValues(payload -> payload instanceof ViewIncomingPayload ?
                         (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
                         new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
-
+        // the graphRDD and the viewRDD must have the same partitioner
+        assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
         newViewIncomingRDD
                 .foreachPartition(partitionIterator -> {
                     HadoopPools.initialize(apacheConfiguration);
@@ -134,6 +138,8 @@ public final class SparkExecutor {
     }
 
     public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
+        // the graphRDD and the viewRDD must have the same partitioner
+        assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
         // attach the final computed view to the cached graph
         return graphRDD.leftOuterJoin(viewIncomingRDD)
                 .mapValues(tuple -> {
@@ -141,7 +147,7 @@ public final class SparkExecutor {
                     vertex.dropVertexProperties(elementComputeKeys);
                     final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
                     view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
-                    view.clear(); // no longer needed so kill it from memory
+                    // view.clear(); // no longer needed so kill it from memory
                     return tuple._1();
                 });
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/09a5d288/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index a38db91..06a8f39 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.spark.HashPartitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -163,18 +164,28 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // create a message-passing friendly rdd from the input rdd
                 JavaPairRDD<Object, VertexWritable> loadedGraphRDD;
                 JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
+                boolean partitioned = false;
                 try {
                     loadedGraphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
                             .newInstance()
                             .readGraphRDD(apacheConfiguration, sparkContext);
+
+                    if (loadedGraphRDD.partitioner().isPresent())
+                        this.logger.info("Using the existing partitioner associated with the loaded graphRDD: " + loadedGraphRDD.partitioner().get());
+                    else {
+                        loadedGraphRDD = loadedGraphRDD.partitionBy(new HashPartitioner(this.workersSet ? this.workers : loadedGraphRDD.partitions().size()));
+                        partitioned = true;
+                    }
+                    assert loadedGraphRDD.partitioner().isPresent();
+                    // if the loaded graphRDD was already partitioned previous, then this coalesce/repartition will not take place
                     if (this.workersSet) {
-                        if (loadedGraphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
+                        if (loadedGraphRDD.partitions().size() > this.workers) // ensures that the loaded graphRDD does not have more partitions than workers
                             loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
-                        else if (loadedGraphRDD.partitions().size() < this.workers) // ensures that the graphRDD does not have less partitions than workers
+                        else if (loadedGraphRDD.partitions().size() < this.workers) // ensures that the loaded graphRDD does not have less partitions than workers
                             loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
                     }
                     // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
-                    if (!inputFromSpark)
+                    if (!inputFromSpark || partitioned)
                         loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
                 } catch (final InstantiationException | IllegalAccessException e) {
                     throw new IllegalStateException(e.getMessage(), e);
@@ -265,7 +276,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 }
 
                 // unpersist the loaded graph if it will not be used again (no PersistedInputRDD)
-                if (!inputFromSpark && computedGraphCreated)
+                // if the graphRDD was loaded from Spark, but then partitioned, its a different RDD
+                if ((!inputFromSpark || partitioned) && computedGraphCreated)
                     loadedGraphRDD.unpersist();
                 // unpersist the computed graph if it will not be used again (no PersistedOutputRDD)
                 if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))