You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/28 00:02:06 UTC

incubator-tinkerpop git commit: more test cases around RDD persistence.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master f8a252202 -> 1e5b8db3d


more test cases around RDD persistence.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/1e5b8db3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/1e5b8db3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/1e5b8db3

Branch: refs/heads/master
Commit: 1e5b8db3d4b0656d294c2fa0cebc4aa658970fb6
Parents: f8a2522
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 27 16:02:00 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 27 16:02:00 2016 -0700

----------------------------------------------------------------------
 .../gremlin/spark/process/computer/SparkGraphComputer.java    | 7 +++++--
 .../spark/structure/io/PersistedInputOutputRDDTest.java       | 5 +++--
 2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1e5b8db3/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 59da7d6..0419fc3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -231,12 +231,15 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // process the map reducers //
                 //////////////////////////////
                 if (!this.mapReducers.isEmpty()) {
+                    // drop all the edges of the graph as they are not used in mapReduce processing
                     computedGraphRDD = computedGraphRDD.mapValues(vertexWritable -> {
                         vertexWritable.get().dropEdges();
                         return vertexWritable;
                     });
-                    if (!outputToSpark && computedGraphCreated)  // if the computed graph wasn't already persisted to spark, persist it here
-                        computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));  // drop all the edges of the graph as they are not used in mapReduce processing
+                    // if the computed graph wasn't already persisted, persist it here for all the MapReduce jobs to reuse
+                    // however, if there is only one MapReduce to execute, don't bother wasting the clock cycles.
+                    if (!outputToSpark && computedGraphCreated && this.mapReducers.size() > 1)
+                        computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
 
                     for (final MapReduce mapReduce : this.mapReducers) {
                         // execute the map reduce job

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1e5b8db3/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 6dac97f..93d83b9 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankMapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -165,7 +166,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
                 .program(TraversalVertexProgram.build()
                         .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
                                 "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
+                                "g.V().count()").create(graph)).submit().get();
         ////////
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
         ///////
@@ -309,7 +310,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2);
         ////
         graph = GraphFactory.open(configuration);
-        graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
+        graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).mapReduce(PageRankMapReduce.build().create()).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
         g = graph.traversal();
         assertEquals(6l, g.V().count().next().longValue());
         assertEquals(6l, g.E().count().next().longValue());