You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 15:44:14 UTC

incubator-tinkerpop git commit: added test cases around persisted of rdds. if the interceptor does not 'touch' the loadedGraphRDD then its still the loadedGraphRDD and thus, persistence rules for loadedGraphRDD should apply there.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1288 f0acfd751 -> 2717b281e


added test cases around persisted of rdds. if the interceptor does not 'touch' the loadedGraphRDD then its still the loadedGraphRDD and thus, persistence rules for loadedGraphRDD should apply there.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/2717b281
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/2717b281
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/2717b281

Branch: refs/heads/TINKERPOP-1288
Commit: 2717b281ed085a365d59edc718db9a33a9266343
Parents: f0acfd7
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 09:44:03 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 09:44:03 2016 -0600

----------------------------------------------------------------------
 .../process/computer/SparkGraphComputer.java    | 10 ++---
 .../io/PersistedInputOutputRDDTest.java         | 46 ++++++++++++++++++--
 2 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2717b281/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 65e1c96..b7ec133 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -156,7 +156,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             final OutputRDD outputRDD;
             final boolean filtered;
             try {
-
                 inputRDD = InputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)) ?
                         hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputRDD.class, InputRDD.class).newInstance() :
                         InputFormatRDD.class.newInstance();
@@ -243,7 +242,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 if (null != this.vertexProgram) {
                     memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                     /////////////////
-                    // if there is a registered VertexProgramInterceptor, use it to by pass the GraphComputer semantics
+                    // if there is a registered VertexProgramInterceptor, use it to bypass the GraphComputer semantics
                     if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
                         try {
                             final SparkVertexProgramInterceptor<VertexProgram> interceptor =
@@ -322,11 +321,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 }
 
                 // unpersist the loaded graph if it will not be used again (no PersistedInputRDD)
-                // if the graphRDD was loaded from Spark, but then partitioned, its a different RDD
-                if ((!skipPartitioner && (!inputFromSpark || partitioned || filtered)) && computedGraphCreated)
+                // if the graphRDD was loaded from Spark, but then partitioned or filtered, its a different RDD
+                if ((!inputFromSpark || partitioned || filtered) && computedGraphCreated)
                     loadedGraphRDD.unpersist();
                 // unpersist the computed graph if it will not be used again (no PersistedOutputRDD)
-                if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))
+                // if the computed graph is the loadedGraphRDD because it was not mutated and not-unpersisted, then don't unpersist the computedGraphRDD/loadedGraphRDD
+                if (!outputToSpark || (this.persist.equals(GraphComputer.Persist.NOTHING) && loadedGraphRDD != computedGraphRDD))
                     computedGraphRDD.unpersist();
                 // delete any file system or rdd data if persist nothing
                 if (null != outputLocation && this.persist.equals(GraphComputer.Persist.NOTHING)) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2717b281/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 3f72093..7a089b6 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -66,15 +66,18 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
         Graph graph = GraphFactory.open(configuration);
+        ///
         assertEquals(6, graph.traversal().withComputer(Computer.compute(SparkGraphComputer.class)).V().out().count().next().longValue());
-        ////////
+        assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertEquals(0, Spark.getContext().getPersistentRDDs().size());
+        //
+        assertEquals(2, graph.traversal().withComputer(Computer.compute(SparkGraphComputer.class)).V().out().out().count().next().longValue());
         assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
         assertEquals(0, Spark.getContext().getPersistentRDDs().size());
         ///////
         Spark.close();
     }
 
-
     @Test
     public void shouldPersistRDDBasedOnStorageLevel() throws Exception {
         Spark.create("local[4]");
@@ -104,7 +107,6 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
             assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getGraphLocation(rddName)).getStorageLevel());
             assertEquals(counter, Spark.getRDDs().size());
             assertEquals(counter, Spark.getContext().getPersistentRDDs().size());
-            //System.out.println(SparkContextStorage.open().ls());
         }
         Spark.close();
     }
@@ -164,6 +166,44 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue());
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
         assertEquals(1, Spark.getContext().getPersistentRDDs().size());
+        ///////
+        graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(graph.traversal().withComputer(SparkGraphComputer.class),
+                                "gremlin-groovy",
+                                "g.V().count()").create(graph)).submit().get();
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
+        assertEquals(2, Spark.getContext().getPersistentRDDs().size());
+        ///////
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, PersistedInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2);
+        graph = GraphFactory.open(configuration);
+        assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue());
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertEquals(1, Spark.getContext().getPersistentRDDs().size());
+        ///////
+        graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(graph.traversal().withComputer(SparkGraphComputer.class),
+                                "gremlin-groovy",
+                                "g.V().count()").create(graph)).submit().get();
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
+        assertEquals(2, Spark.getContext().getPersistentRDDs().size());
+        ///////
+        graph = GraphFactory.open(configuration);
+        assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue());
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertEquals(1, Spark.getContext().getPersistentRDDs().size());
         Spark.close();
     }