You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/28 19:29:23 UTC

incubator-tinkerpop git commit: more test cases around persisted rdds. found a minor bug when the peristed rdd was an input rdd. tweaked the Spark Applicaiton name so it looks nicer in the SparkServer UI. CTR.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master aaf2e94bc -> 93b112d40


more test cases around persisted rdds. found a minor bug when the peristed rdd was an input rdd. tweaked the Spark Applicaiton name so it looks nicer in the SparkServer UI. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/93b112d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/93b112d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/93b112d4

Branch: refs/heads/master
Commit: 93b112d40a42fd47e38ed69e0311a28254475ea7
Parents: aaf2e94
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 28 11:29:12 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 28 11:29:12 2016 -0700

----------------------------------------------------------------------
 .../process/computer/SparkGraphComputer.java    | 19 +++--
 .../gremlin/spark/structure/Spark.java          |  4 +-
 .../io/PersistedInputOutputRDDTest.java         | 86 ++++++++------------
 3 files changed, 48 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/93b112d4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 0419fc3..a38db91 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -231,15 +231,16 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // process the map reducers //
                 //////////////////////////////
                 if (!this.mapReducers.isEmpty()) {
-                    // drop all the edges of the graph as they are not used in mapReduce processing
-                    computedGraphRDD = computedGraphRDD.mapValues(vertexWritable -> {
-                        vertexWritable.get().dropEdges();
-                        return vertexWritable;
-                    });
-                    // if the computed graph wasn't already persisted, persist it here for all the MapReduce jobs to reuse
-                    // however, if there is only one MapReduce to execute, don't bother wasting the clock cycles.
-                    if (!outputToSpark && computedGraphCreated && this.mapReducers.size() > 1)
-                        computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+                    if (computedGraphCreated && !outputToSpark) {
+                        // drop all the edges of the graph as they are not used in mapReduce processing
+                        computedGraphRDD = computedGraphRDD.mapValues(vertexWritable -> {
+                            vertexWritable.get().dropEdges();
+                            return vertexWritable;
+                        });
+                        // if there is only one MapReduce to execute, don't bother wasting the clock cycles.
+                        if (this.mapReducers.size() > 1)
+                            computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+                    }
 
                     for (final MapReduce mapReduce : this.mapReducers) {
                         // execute the map reduce job

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/93b112d4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
index 1c8c41b..08639f4 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
@@ -49,13 +49,13 @@ public class Spark {
     public static void create(final Configuration configuration) {
         final SparkConf sparkConf = new SparkConf();
         configuration.getKeys().forEachRemaining(key -> sparkConf.set(key, configuration.getProperty(key).toString()));
-        sparkConf.setAppName("Spark-Gremlin Persisted Context Application");
+        sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
         CONTEXT = SparkContext.getOrCreate(sparkConf);
     }
 
     public static void create(final String master) {
         final SparkConf sparkConf = new SparkConf();
-        sparkConf.setAppName("Spark-Gremlin Persisted Context Application");
+        sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
         sparkConf.setMaster(master);
         CONTEXT = SparkContext.getOrCreate(sparkConf);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/93b112d4/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 93d83b9..8dd677f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -24,7 +24,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
@@ -38,7 +37,6 @@ import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.io.IoCore;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
@@ -81,20 +79,16 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
     public void shouldPersistRDDBasedOnStorageLevel() throws Exception {
         Spark.create("local[4]");
         int counter = 0;
-        for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY", "MEMORY_ONLY_SER", "MEMORY_AND_DISK_SER", "OFF_HEAP")) {
-            assertEquals(counter * 2, Spark.getRDDs().size());
-            assertEquals(counter * 2, Spark.getContext().getPersistentRDDs().size());
+        for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY", "MEMORY_ONLY_SER", "MEMORY_AND_DISK_SER")) {
+            assertEquals(counter * 3, Spark.getRDDs().size());
+            assertEquals(counter * 3, Spark.getContext().getPersistentRDDs().size());
             counter++;
             final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
-            final Configuration configuration = new BaseConfiguration();
-            configuration.setProperty("spark.master", "local[4]");
-            configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-            configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+            final Configuration configuration = super.getBaseConfiguration();
             configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
             configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
             configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
             configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel);
-            configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
             configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
             configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
             Graph graph = GraphFactory.open(configuration);
@@ -104,14 +98,16 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
                     .program(TraversalVertexProgram.build()
                             .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
                                     "gremlin-groovy",
-                                    "g.V()").create(graph)).submit().get();
+                                    "g.V().groupCount('m').by('name').out()").create(graph)).submit().get();
             ////////
             assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
-            assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getGraphLocation(rddName)).getStorageLevel());
+            assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, "m")));
             assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))));
+            assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getGraphLocation(rddName)).getStorageLevel());
+            assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getMemoryLocation(rddName, "m")).getStorageLevel());
             assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))).getStorageLevel());
-            assertEquals(counter * 2, Spark.getRDDs().size());
-            assertEquals(counter * 2, Spark.getContext().getPersistentRDDs().size());
+            assertEquals(counter * 3, Spark.getRDDs().size());
+            assertEquals(counter * 3, Spark.getContext().getPersistentRDDs().size());
             //System.out.println(SparkContextStorage.open().ls());
         }
         Spark.close();
@@ -121,14 +117,10 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
     public void shouldNotPersistRDDAcrossJobs() throws Exception {
         Spark.create("local[4]");
         final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        final Configuration configuration = super.getBaseConfiguration();
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);  // because the spark context is NOT persisted, neither is the RDD
         Graph graph = GraphFactory.open(configuration);
@@ -142,6 +134,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         ////////
         Spark.create("local[4]");
         assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertEquals(0, Spark.getContext().getPersistentRDDs().size());
         Spark.close();
     }
 
@@ -149,15 +142,12 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
     public void shouldPersistRDDAcrossJobs() throws Exception {
 
         final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
+        final Configuration configuration = super.getBaseConfiguration();
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
         Graph graph = GraphFactory.open(configuration);
         graph.compute(SparkGraphComputer.class)
@@ -167,38 +157,31 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
                         .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
                                 "gremlin-groovy",
                                 "g.V().count()").create(graph)).submit().get();
-        ////////
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("reducing"))));
+        assertEquals(2, Spark.getContext().getPersistentRDDs().size());
         ///////
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2);
         graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.NOTHING)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
+        assertEquals(6, graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().out().count().next().longValue());
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("reducing"))));
+        assertEquals(2, Spark.getContext().getPersistentRDDs().size());
         Spark.close();
     }
 
     @Test
     public void testBulkLoaderVertexProgramChain() throws Exception {
         Spark.create("local[4]");
-
         final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
-        final Configuration readConfiguration = new BaseConfiguration();
-        readConfiguration.setProperty("spark.master", "local[4]");
-        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        final Configuration readConfiguration = super.getBaseConfiguration();
         readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
         readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
         readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
         readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
         Graph pageRankGraph = GraphFactory.open(readConfiguration);
         ///////////////
@@ -237,15 +220,11 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         Spark.create("local[4]");
 
         final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
-        final Configuration readConfiguration = new BaseConfiguration();
-        readConfiguration.setProperty("spark.master", "local[4]");
-        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        final Configuration readConfiguration = super.getBaseConfiguration();
         readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
         readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
         readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
         readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
         Graph pageRankGraph = GraphFactory.open(readConfiguration);
         ///////////////
@@ -282,18 +261,15 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
 
         final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, "testComplexChain", "graphRDD");
         final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, "testComplexChain", "graphRDD2");
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        final Configuration configuration = super.getBaseConfiguration();
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
 
         assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertEquals(0, Spark.getContext().getPersistentRDDs().size());
         Graph graph = GraphFactory.open(configuration);
         graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
         GraphTraversalSource g = graph.traversal();
@@ -303,6 +279,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
         ////
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertEquals(1, Spark.getContext().getPersistentRDDs().size());
         ////
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
@@ -317,7 +294,10 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
         assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
         ////
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
+        assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY)));
+        assertEquals(3, Spark.getContext().getPersistentRDDs().size());
         ////
         graph = GraphFactory.open(configuration);
         graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
@@ -327,7 +307,10 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
         assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
         ////
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
+        assertFalse(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY)));
+        assertEquals(2, Spark.getContext().getPersistentRDDs().size());
         ////
         graph = GraphFactory.open(configuration);
         graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
@@ -338,7 +321,10 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         assertEquals(0l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
         assertEquals(0l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
         ////
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
         assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
+        assertFalse(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY)));
+        assertEquals(1, Spark.getContext().getPersistentRDDs().size());
         Spark.close();
     }
 }