You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/11/02 16:52:09 UTC

[07/12] incubator-tinkerpop git commit: @RussellSpitzer has schooled me in cache()/unpersist(). I now am smart to unpersist() RDDs that are just dangling around (especailly since we now have persistent contexts). Finally, I did some refactoring of packag

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
deleted file mode 100644
index b83314a..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputOutputRDDTest {
-
-    @Test
-    public void shouldReadFromWriteToArbitraryRDD() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
deleted file mode 100644
index 3070fea..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputRDDTest {
-
-    @Test
-    public void shouldReadFromArbitraryRDD() {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
-        assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
deleted file mode 100644
index febece6..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class OutputRDDTest {
-
-    @Test
-    public void shouldWriteToArbitraryRDD() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
deleted file mode 100644
index a7d1ac0..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.IoCore;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.junit.Test;
-
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class PersistedInputOutputRDDTest {
-
-    @Test
-    public void shouldNotPersistRDDAcrossJobs() throws Exception {
-        final String rddName = "target/test-output/" + UUID.randomUUID();
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);  // because the spark context is NOT persisted, neither is the RDD
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-        ////////
-        SparkConf sparkConfiguration = new SparkConf();
-        sparkConfiguration.setAppName("shouldNotPersistRDDAcrossJobs");
-        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
-    }
-
-    @Test
-    public void shouldPersistRDDAcrossJobs() throws Exception {
-        final String rddName = "target/test-output/" + UUID.randomUUID();
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-        ////////
-        SparkConf sparkConfiguration = new SparkConf();
-        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
-        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-        assertTrue(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
-        ///////
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
-        graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.NOTHING)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-
-    @Test
-    public void testBulkLoaderVertexProgramChain() throws Exception {
-        final String rddName = "target/test-output/" + UUID.randomUUID().toString();
-        final Configuration readConfiguration = new BaseConfiguration();
-        readConfiguration.setProperty("spark.master", "local[4]");
-        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph pageRankGraph = GraphFactory.open(readConfiguration);
-        ///////////////
-        final Configuration writeConfiguration = new BaseConfiguration();
-        writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
-        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
-        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
-        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
-        bulkLoaderGraph.compute(SparkGraphComputer.class)
-                .persist(GraphComputer.Persist.NOTHING)
-                .workers(1)
-                .configure(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName())
-                .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName)
-                .configure(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)
-                .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null)
-                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
-                .submit().get();
-        ////
-        SparkConf sparkConfiguration = new SparkConf();
-        sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
-        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
-        ////
-        final Graph graph = TinkerGraph.open();
-        final GraphTraversalSource g = graph.traversal();
-        graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
-        assertEquals(6l, g.V().count().next().longValue());
-        assertEquals(6l, g.E().count().next().longValue());
-        assertEquals("marko", g.V().has("name", "marko").values("name").next());
-        assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
-        assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
-        assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
-        assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
new file mode 100644
index 0000000..8417d1c
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ExampleInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final List<Vertex> list = new ArrayList<>();
+        list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
+        list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
+        list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
+        list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
+        return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
new file mode 100644
index 0000000..9a9e5ef
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ExampleOutputRDD implements OutputRDD {
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        int totalAge = 0;
+        final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
+        while (iterator.hasNext()) {
+            final Vertex vertex = iterator.next().get();
+            if (vertex.label().equals("person"))
+                totalAge = totalAge + vertex.<Integer>value("age");
+        }
+        assertEquals(123, totalAge);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
new file mode 100644
index 0000000..1feeb61
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class InputOutputRDDTest {
+
+    @Test
+    public void shouldReadFromWriteToArbitraryRDD() throws Exception {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
new file mode 100644
index 0000000..8d6c06e
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class InputRDDTest {
+
+    @Test
+    public void shouldReadFromArbitraryRDD() {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
+        assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
new file mode 100644
index 0000000..43dcdbd
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class OutputRDDTest {
+
+    @Test
+    public void shouldWriteToArbitraryRDD() throws Exception {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
new file mode 100644
index 0000000..6ee7aaa
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PersistedInputOutputRDDTest {
+
+    @Test
+    public void shouldNotPersistRDDAcrossJobs() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);  // because the spark context is NOT persisted, neither is the RDD
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("shouldNotPersistRDDAcrossJobs");
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+    }
+
+    @Test
+    public void shouldPersistRDDAcrossJobs() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertTrue(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ///////
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+        graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.NOTHING)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+
+    @Test
+    public void testBulkLoaderVertexProgramChain() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID().toString();
+        final Configuration readConfiguration = new BaseConfiguration();
+        readConfiguration.setProperty("spark.master", "local[4]");
+        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph pageRankGraph = GraphFactory.open(readConfiguration);
+        ///////////////
+        final Configuration writeConfiguration = new BaseConfiguration();
+        writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
+        bulkLoaderGraph.compute(SparkGraphComputer.class)
+                .persist(GraphComputer.Persist.NOTHING)
+                .workers(1)
+                .configure(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName())
+                .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName)
+                .configure(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)
+                .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null)
+                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
+                .submit().get();
+        ////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ////
+        final Graph graph = TinkerGraph.open();
+        final GraphTraversalSource g = graph.traversal();
+        graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+        assertEquals(6l, g.V().count().next().longValue());
+        assertEquals(6l, g.E().count().next().longValue());
+        assertEquals("marko", g.V().has("name", "marko").values("name").next());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+    }
+
+    @Test
+    public void testBulkLoaderVertexProgramChainWithInputOutputHelperMapping() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID().toString();
+        final Configuration readConfiguration = new BaseConfiguration();
+        readConfiguration.setProperty("spark.master", "local[4]");
+        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph pageRankGraph = GraphFactory.open(readConfiguration);
+        ///////////////
+        final Configuration writeConfiguration = new BaseConfiguration();
+        writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
+        bulkLoaderGraph.compute(SparkGraphComputer.class)
+                .persist(GraphComputer.Persist.NOTHING)
+                .workers(1)
+                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
+                .submit().get();
+        ////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ////
+        final Graph graph = TinkerGraph.open();
+        final GraphTraversalSource g = graph.traversal();
+        graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+        assertEquals(6l, g.V().count().next().longValue());
+        assertEquals(6l, g.E().count().next().longValue());
+        assertEquals("marko", g.V().has("name", "marko").values("name").next());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+    }
+}