You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/06 17:49:46 UTC

incubator-tinkerpop git commit: added various asserts around the different RDDs and their state through out the SparkGraphComputer computation to ensure that assumptions are correct. For the Spark integration test suite, they are. CTR.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 1c6aa55ad -> bba8d6766


added various asserts around the different RDDs and their state through out the SparkGraphComputer computation to ensure that assumptions are correct. For the Spark integration test suite, they are. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/bba8d676
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/bba8d676
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/bba8d676

Branch: refs/heads/master
Commit: bba8d67661cd606b1a0db80794c183c215c4695c
Parents: 1c6aa55
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri May 6 11:49:26 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri May 6 11:49:39 2016 -0600

----------------------------------------------------------------------
 .../process/computer/SparkGraphComputer.java    | 28 +++++++++++++++-----
 1 file changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bba8d676/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 5bc040f..9a9e934 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -289,14 +289,21 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                             }
                         }
                         // if the graph will be continued to be used (persisted or mapreduced), then generate a view+graph
-                        if ((null != outputRDD && !this.persist.equals(Persist.NOTHING)) || !this.mapReducers.isEmpty())
+                        if ((null != outputRDD && !this.persist.equals(Persist.NOTHING)) || !this.mapReducers.isEmpty()) {
                             computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
+                            assert null != computedGraphRDD && computedGraphRDD != loadedGraphRDD;
+                        } else {
+                            // ensure that the computedGraphRDD was not created
+                            assert null == computedGraphRDD;
+                        }
                     }
                     /////////////////
                     memory.complete(); // drop all transient memory keys
                     // write the computed graph to the respective output (rdd or output format)
-                    if (null != outputRDD && !this.persist.equals(Persist.NOTHING))
+                    if (null != outputRDD && !this.persist.equals(Persist.NOTHING)) {
+                        assert null != computedGraphRDD; // the logic holds that a computeGraphRDD must be created at this point
                         outputRDD.writeGraphRDD(apacheConfiguration, computedGraphRDD);
+                    }
                 }
 
                 final boolean computedGraphCreated = computedGraphRDD != null && computedGraphRDD != loadedGraphRDD;
@@ -309,15 +316,17 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // process the map reducers //
                 //////////////////////////////
                 if (!this.mapReducers.isEmpty()) {
+                    // create a mapReduceRDD for executing the map reduce jobs on
+                    JavaPairRDD<Object, VertexWritable> mapReduceRDD = computedGraphRDD;
                     if (computedGraphCreated && !outputToSpark) {
                         // drop all the edges of the graph as they are not used in mapReduce processing
-                        computedGraphRDD = computedGraphRDD.mapValues(vertexWritable -> {
+                        mapReduceRDD = computedGraphRDD.mapValues(vertexWritable -> {
                             vertexWritable.get().dropEdges(Direction.BOTH);
                             return vertexWritable;
                         });
                         // if there is only one MapReduce to execute, don't bother wasting the clock cycles.
                         if (this.mapReducers.size() > 1)
-                            computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+                            mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
                     }
 
                     for (final MapReduce mapReduce : this.mapReducers) {
@@ -325,7 +334,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                         mapReduce.storeState(newApacheConfiguration);
                         // map
-                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) computedGraphRDD, mapReduce, newApacheConfiguration);
+                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceRDD, mapReduce, newApacheConfiguration);
                         // combine
                         final JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, newApacheConfiguration) : mapRDD;
                         // reduce
@@ -333,7 +342,14 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         // write the map reduce output back to disk and computer result memory
                         if (null != outputRDD)
                             mapReduce.addResultToMemory(finalMemory, outputRDD.writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), reduceRDD));
-
+                    }
+                    // if the mapReduceRDD is not simply the computed graph, unpersist the mapReduceRDD
+                    if (computedGraphCreated && !outputToSpark) {
+                        assert loadedGraphRDD != computedGraphRDD;
+                        assert mapReduceRDD != computedGraphRDD;
+                        mapReduceRDD.unpersist();
+                    } else {
+                        assert mapReduceRDD == computedGraphRDD;
                     }
                 }