You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/27 18:58:15 UTC

incubator-tinkerpop git commit: found an RDD persistence 'memory bug' in SparkGraphComputer while doing large scale testing with @dkuppitz using 4 Blades and the 2 billion edge Friendster dataset. Fixed. CTR.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 4dd89e914 -> e5e7cbaa9


found an RDD persistence 'memory bug' in SparkGraphComputer while doing large scale testing with @dkuppitz using 4 Blades and the 2 billion edge Friendster dataset. Fixed. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e5e7cbaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e5e7cbaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e5e7cbaa

Branch: refs/heads/master
Commit: e5e7cbaa91863e45392ce994782f6855dc680e5e
Parents: 4dd89e9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 27 10:57:59 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 27 10:58:10 2016 -0700

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   |  7 ++---
 .../process/computer/SparkGraphComputer.java    | 29 ++++++++++++++------
 .../computer/SparkHadoopGraphProvider.java      |  2 +-
 3 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e5e7cbaa/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 5889bdb..f1734c8 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -133,9 +133,9 @@ public final class SparkExecutor {
         return newViewIncomingRDD;
     }
 
-    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys, final boolean unpersistInput) {
+    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
         // attach the final computed view to the cached graph
-        final JavaPairRDD<Object, VertexWritable> finalGraphRDD = graphRDD.leftOuterJoin(viewIncomingRDD)
+        return graphRDD.leftOuterJoin(viewIncomingRDD)
                 .mapValues(tuple -> {
                     final StarGraph.StarVertex vertex = tuple._1().get();
                     vertex.dropVertexProperties(elementComputeKeys);
@@ -144,9 +144,6 @@ public final class SparkExecutor {
                     view.clear(); // no longer needed so kill it from memory
                     return tuple._1();
                 });
-        if (unpersistInput)
-            graphRDD.unpersist();
-        return finalGraphRDD;
     }
 
     /////////////////

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e5e7cbaa/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 58a0af6..60da1d7 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -162,6 +162,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 updateLocalConfiguration(sparkContext, sparkConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 JavaPairRDD<Object, VertexWritable> graphRDD;
+                JavaPairRDD<Object, VertexWritable> finalGraphRDD = null;
                 try {
                     graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
                             .newInstance()
@@ -207,36 +208,43 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     }
                     // write the graph rdd using the output rdd
                     final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
-                    graphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys, !inputFromSpark);
+                    finalGraphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys);
                     if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
-                            hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) &&
-                            !this.persist.equals(GraphComputer.Persist.NOTHING)) {
+                            hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) && !this.persist.equals(Persist.NOTHING)) {
                         try {
                             hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
                                     .newInstance()
-                                    .writeGraphRDD(apacheConfiguration, graphRDD);
+                                    .writeGraphRDD(apacheConfiguration, finalGraphRDD);
                         } catch (final InstantiationException | IllegalAccessException e) {
                             throw new IllegalStateException(e.getMessage(), e);
                         }
                     }
                 }
 
+                final boolean finalGraphComputed = finalGraphRDD != null;
+                if (!finalGraphComputed)
+                    finalGraphRDD = graphRDD;
+
                 final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
 
                 //////////////////////////////
                 // process the map reducers //
                 //////////////////////////////
                 if (!this.mapReducers.isEmpty()) {
-                    final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = graphRDD.mapValues(vertexWritable -> {
+                    finalGraphRDD = finalGraphRDD.mapValues(vertexWritable -> {
                         vertexWritable.get().dropEdges();
                         return vertexWritable;
-                    });  // drop all the edges of the graph as they are not used in mapReduce processing
+                    });
+                    if (!outputToSpark)
+                        finalGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));  // drop all the edges of the graph as they are not used in mapReduce processing
+                    if (!inputFromSpark && finalGraphComputed) // if there was a final graph computed (that is, a vertex program executed), then drop the loaded graph
+                        graphRDD.unpersist();
                     for (final MapReduce mapReduce : this.mapReducers) {
                         // execute the map reduce job
                         final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                         mapReduce.storeState(newApacheConfiguration);
                         // map
-                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
+                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) finalGraphRDD, mapReduce, newApacheConfiguration);
                         // combine
                         final JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, newApacheConfiguration) : mapRDD;
                         // reduce
@@ -253,9 +261,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     }
                 }
 
-                // unpersist the graphRDD if it will no longer be used
-                if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))
+                // unpersist the loaded graphRDD as it will never be used again
+                if (!inputFromSpark && finalGraphComputed)
                     graphRDD.unpersist();
+                // unpersist the final computed graphRDD if it will not be used again (no PersistedOutputRDD)
+                if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))
+                    finalGraphRDD.unpersist();
                 // delete any file system output if persist nothing
                 if (null != outputLocation && this.persist.equals(GraphComputer.Persist.NOTHING)) {
                     if (outputToHDFS)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e5e7cbaa/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 108d0ed..e2d77c4 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -64,6 +64,6 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider {
 
     @Override
     public GraphTraversalSource traversal(final Graph graph) {
-        return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph);
+        return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class).workers(new Random().nextInt(3)+1)).create(graph);
     }
 }
\ No newline at end of file