You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/27 23:28:48 UTC

incubator-tinkerpop git commit: organization tweaks around persistence of RDDs. Minor nothings.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 64227b8a3 -> f8a252202


organization tweaks around persistence of RDDs. Minor nothings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f8a25220
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f8a25220
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f8a25220

Branch: refs/heads/master
Commit: f8a252202eb840c13ae96aa30a797c050c8d3624
Parents: 64227b8
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 27 15:28:38 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 27 15:28:38 2016 -0700

----------------------------------------------------------------------
 .../process/computer/SparkGraphComputer.java    | 55 ++++++++++----------
 .../computer/SparkHadoopGraphProvider.java      |  6 ++-
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f8a25220/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 818b9a9..59da7d6 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -161,21 +161,21 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 Spark.create(sparkContext.sc()); // this is the context RDD holder that prevents GC
                 updateLocalConfiguration(sparkContext, sparkConfiguration);
                 // create a message-passing friendly rdd from the input rdd
-                JavaPairRDD<Object, VertexWritable> graphRDD;
-                JavaPairRDD<Object, VertexWritable> finalGraphRDD = null;
+                JavaPairRDD<Object, VertexWritable> loadedGraphRDD;
+                JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
                 try {
-                    graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+                    loadedGraphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
                             .newInstance()
                             .readGraphRDD(apacheConfiguration, sparkContext);
                     if (this.workersSet) {
-                        if (graphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
-                            graphRDD = graphRDD.coalesce(this.workers);
-                        else if (graphRDD.partitions().size() < this.workers) // ensures that the graphRDD does not have less partitions than workers
-                            graphRDD = graphRDD.repartition(this.workers);
+                        if (loadedGraphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
+                            loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
+                        else if (loadedGraphRDD.partitions().size() < this.workers) // ensures that the graphRDD does not have less partitions than workers
+                            loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
                     }
                     // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
                     if (!inputFromSpark)
-                        graphRDD = graphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+                        loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
                 } catch (final InstantiationException | IllegalAccessException e) {
                     throw new IllegalStateException(e.getMessage(), e);
                 }
@@ -197,7 +197,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     // execute the vertex program
                     while (true) {
                         memory.setInTask(true);
-                        viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+                        viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
                         memory.setInTask(false);
                         if (this.vertexProgram.terminate(memory))
                             break;
@@ -206,24 +206,24 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                             memory.broadcastMemory(sparkContext);
                         }
                     }
-                    // write the graph rdd using the output rdd
+                    // write the computed graph to the respective output (rdd or output format)
                     final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
-                    finalGraphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys);
+                    computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, elementComputeKeys);
                     if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
                             hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) && !this.persist.equals(Persist.NOTHING)) {
                         try {
                             hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
                                     .newInstance()
-                                    .writeGraphRDD(apacheConfiguration, finalGraphRDD);
+                                    .writeGraphRDD(apacheConfiguration, computedGraphRDD);
                         } catch (final InstantiationException | IllegalAccessException e) {
                             throw new IllegalStateException(e.getMessage(), e);
                         }
                     }
                 }
 
-                final boolean finalGraphComputed = finalGraphRDD != null;
-                if (!finalGraphComputed)
-                    finalGraphRDD = graphRDD;
+                final boolean computedGraphCreated = computedGraphRDD != null;
+                if (!computedGraphCreated)
+                    computedGraphRDD = loadedGraphRDD;
 
                 final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
 
@@ -231,25 +231,24 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // process the map reducers //
                 //////////////////////////////
                 if (!this.mapReducers.isEmpty()) {
-                    finalGraphRDD = finalGraphRDD.mapValues(vertexWritable -> {
+                    computedGraphRDD = computedGraphRDD.mapValues(vertexWritable -> {
                         vertexWritable.get().dropEdges();
                         return vertexWritable;
                     });
-                    if (!outputToSpark)
-                        finalGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));  // drop all the edges of the graph as they are not used in mapReduce processing
-                    if (!inputFromSpark && finalGraphComputed) // if there was a final graph computed (that is, a vertex program executed), then drop the loaded graph
-                        graphRDD.unpersist();
+                    if (!outputToSpark && computedGraphCreated)  // if the computed graph wasn't already persisted to spark, persist it here
+                        computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));  // drop all the edges of the graph as they are not used in mapReduce processing
+
                     for (final MapReduce mapReduce : this.mapReducers) {
                         // execute the map reduce job
                         final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                         mapReduce.storeState(newApacheConfiguration);
                         // map
-                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) finalGraphRDD, mapReduce, newApacheConfiguration);
+                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) computedGraphRDD, mapReduce, newApacheConfiguration);
                         // combine
                         final JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, newApacheConfiguration) : mapRDD;
                         // reduce
                         final JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : combineRDD;
-                        // write the map reduce output back to disk (memory)
+                        // write the map reduce output back to disk and computer result memory
                         try {
                             mapReduce.addResultToMemory(finalMemory,
                                     hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
@@ -261,13 +260,13 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     }
                 }
 
-                // unpersist the loaded graphRDD as it will never be used again
-                if (!inputFromSpark && finalGraphComputed)
-                    graphRDD.unpersist();
-                // unpersist the final computed graphRDD if it will not be used again (no PersistedOutputRDD)
+                // unpersist the loaded graph if it will not be used again (no PersistedInputRDD)
+                if (!inputFromSpark && computedGraphCreated)
+                    loadedGraphRDD.unpersist();
+                // unpersist the computed graph if it will not be used again (no PersistedOutputRDD)
                 if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))
-                    finalGraphRDD.unpersist();
-                // delete any file system output if persist nothing
+                    computedGraphRDD.unpersist();
+                // delete any file system or rdd data if persist nothing
                 if (null != outputLocation && this.persist.equals(GraphComputer.Persist.NOTHING)) {
                     if (outputToHDFS)
                         fileSystemStorage.rm(outputLocation);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f8a25220/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index e2d77c4..19085da 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -25,8 +25,8 @@ import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
 import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -64,6 +64,8 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider {
 
     @Override
     public GraphTraversalSource traversal(final Graph graph) {
-        return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class).workers(new Random().nextInt(3)+1)).create(graph);
+        return RANDOM.nextBoolean() ?
+                GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class).workers(RANDOM.nextInt(3) + 1)).create(graph) :
+                GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph);
     }
 }
\ No newline at end of file