You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/11/05 02:56:25 UTC

[07/50] [abbrv] incubator-tinkerpop git commit: This is a masterpiece here. PersistedXXXRDD is now a Spark RDD class where the inputLocation (outputLocation) are the names of the RDD. No HDFS is used between jobs as the graphRDD is stored in the SparkSer

This is a masterpiece here. PersistedXXXRDD is now a Spark RDD class where the inputLocation (outputLocation) are the names of the RDD. No HDFS is used between jobs as the graphRDD is stored in the SparkServer using a persisted context. Added test cases, renamed GraphComputer.config() to configure() to be consistent with the naming conventions of GraphComputer methods. Also made it default as most implementaitons won't need it and there is no point to require a random return this. Updated docs accordingly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/528ba027
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/528ba027
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/528ba027

Branch: refs/heads/TINKERPOP3-923
Commit: 528ba027a098bc722211767b11b7dc010fb2cba1
Parents: 16b5005
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Oct 30 11:48:42 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Oct 30 11:48:42 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   3 +-
 docs/src/implementations.asciidoc               |  32 ++--
 docs/src/the-traversal.asciidoc                 |  11 +-
 .../upgrade-release-3.1.x-incubating.asciidoc   |   8 +
 .../process/computer/GiraphGraphComputer.java   |   2 +-
 .../gremlin/process/computer/GraphComputer.java |   5 +-
 .../process/computer/GraphComputerTest.java     |   2 +-
 .../tinkerpop/gremlin/hadoop/Constants.java     |   3 -
 .../process/computer/SparkGraphComputer.java    |  78 ++++-----
 .../process/computer/io/PersistedInputRDD.java  |  60 +++++++
 .../process/computer/io/PersistedOutputRDD.java |  41 +++++
 .../process/computer/util/SparkHelper.java      |  49 ------
 .../io/PersistedInputOutputRDDTest.java         | 168 +++++++++++++++++++
 .../io/SparkContextPersistenceTest.java         | 126 --------------
 .../process/computer/TinkerGraphComputer.java   |   5 -
 15 files changed, 347 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 8bd6e51..7ee21ad 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,8 +25,9 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Added `PersistedInputRDD` and `PersistedOutputRDD` which enables `SparkGraphComputer` to store the graph RDD in the context between jobs.
 * Renamed the `public static String` configuration variable names of TinkerGraph (deprecated old variables).
-* Added `GraphComputer.config(key,value)` to allow engine-specific configurations.
+* Added `GraphComputer.configure(key,value)` to allow engine-specific configurations.
 * `GraphStep` is no longer in the `sideEffect`-package and is now in `map`-package (breaking change).
 * Added suppport for mid-traversal `V()`-steps (`GraphStep` semantics updated).
 * Fixed `Number` handling in `Operator` enums. Prior this change a lot of operations on mixed `Number` types returned a wrong result (wrong data type).

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/docs/src/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/implementations.asciidoc b/docs/src/implementations.asciidoc
index 90ade3f..8110c5f 100644
--- a/docs/src/implementations.asciidoc
+++ b/docs/src/implementations.asciidoc
@@ -1196,11 +1196,16 @@ image::spark-algorithm.png[width=775]
 |gremlin.spark.persistContext |Whether to create a new `SparkContext` for every `SparkGraphComputer` or to reuse an existing one.
 |========================================================
 
-IMPORTANT: If the provider/user wishes to not use Hadoop `InputFormats`, it is possible to leverage Spark's RDD
+If the provider/user wishes to not use Hadoop `InputFormats`, it is possible to leverage Spark's RDD
 constructs directly. There is a `gremlin.spark.graphInputRDD` configuration that references a `Class<? extends
 InputRDD>`. An `InputRDD` provides a read method that takes a `SparkContext` and returns a graphRDD. Likewise, use
 `gremlin.spark.graphOutputRDD` and the respective `OutputRDD`.
 
+It is possible to persist the graph RDD between jobs within the `SparkContext` (e.g. SparkServer) by leveraging `PersistedOutputRDD`.
+Note that `gremlin.spark.persistContext` should be set to `true` or else the persisted RDD will be destroyed when the `SparkContext` closes.
+The persisted RDD is named by the `gremlin.hadoop.outputLocation` configuration (i.e. named in `SparkContext.getPersistedRDDs()`).
+Finally, `PersistedInputRDD` is used with respective  `gremlin.hadoop.inputLocation` to retrieve the persisted RDD from the `SparkContext`.
+
 Loading with BulkLoaderVertexProgram
 ++++++++++++++++++++++++++++++++++++
 
@@ -1211,14 +1216,14 @@ Grateful Dead graph from HadoopGraph into TinkerGraph over Spark:
 [gremlin-groovy]
 ----
 hdfs.copyFromLocal('data/grateful-dead.kryo', 'data/grateful-dead.kryo')
-wgConf = 'conf/tinkergraph-gryo.properties'
-grateful = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
+readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
+writeGraph = 'conf/tinkergraph-gryo.properties'
 blvp = BulkLoaderVertexProgram.build().
            keepOriginalIds(false).
-           writeGraph(wgConf).create(grateful)
-grateful.compute(SparkGraphComputer).workers(1).program(blvp).submit().get()
+           writeGraph(writeGraph).create(readGraph)
+readGraph.compute(SparkGraphComputer).workers(1).program(blvp).submit().get()
 :set max-iteration 10
-graph = GraphFactory.open(wgConf)
+graph = GraphFactory.open(writeGraph)
 g = graph.traversal()
 g.V().valueMap()
 graph.close()
@@ -1233,7 +1238,6 @@ graph.close()
 #
 gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
 gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
-gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
 gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
 gremlin.hadoop.inputLocation=data/grateful-dead.kryo
 gremlin.hadoop.outputLocation=output
@@ -1245,7 +1249,7 @@ gremlin.hadoop.jarsInDistributedCache=true
 #
 spark.master=local[1]
 spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
 ----
 
 [source,properties]
@@ -1257,7 +1261,7 @@ gremlin.tinkergraph.graphFormat=gryo
 gremlin.tinkergraph.graphLocation=/tmp/tinkergraph.kryo
 ----
 
-NOTE: The path to TinkerGraph needs to be included in the `HADOOP_GREMLIN_LIBS` for the above example to work.
+IMPORTANT: The path to TinkerGraph jars needs to be included in the `HADOOP_GREMLIN_LIBS` for the above example to work.
 
 [[giraphgraphcomputer]]
 GiraphGraphComputer
@@ -1341,14 +1345,14 @@ the Grateful Dead graph from HadoopGraph into TinkerGraph over Giraph:
 [gremlin-groovy]
 ----
 hdfs.copyFromLocal('data/grateful-dead.kryo', 'data/grateful-dead.kryo')
-wgConf = 'conf/tinkergraph-gryo.properties'
-grateful = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
+readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
+writeGraph = 'conf/tinkergraph-gryo.properties'
 blvp = BulkLoaderVertexProgram.build().
            keepOriginalIds(false).
-           writeGraph(wgConf).create(grateful)
-grateful.compute(GiraphGraphComputer).workers(1).program(blvp).submit().get()
+           writeGraph(writeGraph).create(readGraph)
+readGraph.compute(GiraphGraphComputer).workers(1).program(blvp).submit().get()
 :set max-iteration 10
-graph = GraphFactory.open(wgConf)
+graph = GraphFactory.open(writeGraph)
 g = graph.traversal()
 g.V().valueMap()
 graph.close()

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/docs/src/the-traversal.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/the-traversal.asciidoc b/docs/src/the-traversal.asciidoc
index 42365fa..74e859e 100644
--- a/docs/src/the-traversal.asciidoc
+++ b/docs/src/the-traversal.asciidoc
@@ -2174,12 +2174,9 @@ public final class TinkerGraphStepStrategy extends AbstractTraversalStrategy<Tra
         if (traversal.getEngine().isComputer())
             return;
 
-        final Step<?, ?> startStep = traversal.getStartStep();
-        if (startStep instanceof GraphStep) {
-            final GraphStep<?> originalGraphStep = (GraphStep) startStep;
-            final TinkerGraphStep<?> tinkerGraphStep = new TinkerGraphStep<>(originalGraphStep);
-            TraversalHelper.replaceStep(startStep, (Step) tinkerGraphStep, traversal);
-
+        TraversalHelper.getStepsOfClass(GraphStep.class, traversal).forEach(originalGraphStep -> {
+            final TinkerGraphStep<?,?> tinkerGraphStep = new TinkerGraphStep<>(originalGraphStep);
+            TraversalHelper.replaceStep(originalGraphStep, (Step) tinkerGraphStep, traversal);
             Step<?, ?> currentStep = tinkerGraphStep.getNextStep();
             while (currentStep instanceof HasContainerHolder) {
                 ((HasContainerHolder) currentStep).getHasContainers().forEach(tinkerGraphStep::addHasContainer);
@@ -2187,7 +2184,7 @@ public final class TinkerGraphStepStrategy extends AbstractTraversalStrategy<Tra
                 traversal.removeStep(currentStep);
                 currentStep = currentStep.getNextStep();
             }
-        }
+        });
     }
 
     public static TinkerGraphStepStrategy instance() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/docs/src/upgrade-release-3.1.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade-release-3.1.x-incubating.asciidoc b/docs/src/upgrade-release-3.1.x-incubating.asciidoc
index b50dd7e..77a448f 100644
--- a/docs/src/upgrade-release-3.1.x-incubating.asciidoc
+++ b/docs/src/upgrade-release-3.1.x-incubating.asciidoc
@@ -164,3 +164,11 @@ The `VendorOptimizationStrategy` has been renamed to `ProviderOptimizationStrate
 with revised terminology for what were formerly referred to as "vendors".
 
 See link:https://issues.apache.org/jira/browse/TINKERPOP3-876[TINKERPOP3-876] for more information.
+
+GraphComputer Updates
++++++++++++++++++++++
+
+`GraphComputer.configure(String key, Object value)` is now a method. This allows the user to specify engine-specific
+parameters to the underlying OLAP system. These parameters are not intended to be cross engine supported. Moreover, if
+there are not parameters that can be altered (beyond the standard `GraphComputer` methods), then the provider's `GraphComputer`
+implementation should simply return and do nothing.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 1e5dbd8..4c33220 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -93,7 +93,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     }
 
     @Override
-    public GraphComputer config(final String key, final Object value) {
+    public GraphComputer configure(final String key, final Object value) {
         this.giraphConfiguration.set(key, value.toString());
         this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
         return this;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
index bfb7bde..547af9e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -108,12 +108,15 @@ public interface GraphComputer {
      * Typically, the other fluent methods in {@link GraphComputer} should be used to configure the computation.
      * However, for some custom configuration in the underlying engine, this method should be used.
      * Different GraphComputer implementations will have different key/values and thus, parameters placed here are generally not universal to all GraphComputer implementations.
+     * The default implementation simply does nothing and returns the {@link GraphComputer} unchanged.
      *
      * @param key   the key of the configuration
      * @param value the value of the configuration
      * @return the updated GraphComputer with newly set key/value configuration
      */
-    public GraphComputer config(final String key, final Object value);
+    public default GraphComputer configure(final String key, final Object value) {
+        return this;
+    }
 
     /**
      * Submit the {@link VertexProgram} and the set of {@link MapReduce} jobs for execution by the {@link GraphComputer}.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 0026296..acbdc81 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -127,7 +127,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public GraphComputer config(final String key, final Object value) {
+        public GraphComputer configure(final String key, final Object value) {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 7264001..469e9b0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -52,7 +52,4 @@ public final class Constants {
     public static final String GREMLIN_SPARK_GRAPH_INPUT_RDD = "gremlin.spark.graphInputRDD";
     public static final String GREMLIN_SPARK_GRAPH_OUTPUT_RDD = "gremlin.spark.graphOutputRDD";
     public static final String GREMLIN_SPARK_PERSIST_CONTEXT = "gremlin.spark.persistContext";
-    public static final String GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME = "gremlin.spark.graphInputRDD.name";
-    public static final String GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME = "gremlin.spark.graphOutputRDD.name";
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index ef2ae6f..c9e95e3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.launcher.SparkLauncher;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
@@ -50,7 +50,6 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
 
 import java.io.File;
 import java.io.IOException;
@@ -74,36 +73,45 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
     @Override
     public GraphComputer workers(final int workers) {
         super.workers(workers);
-        if (this.sparkConfiguration.getString("spark.master").startsWith("local")) {
-            this.sparkConfiguration.setProperty("spark.master", "local[" + this.workers + "]");
+        if (this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
+            this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
         }
         return this;
     }
 
     @Override
-    public GraphComputer config(final String key, final Object value) {
+    public GraphComputer configure(final String key, final Object value) {
         this.sparkConfiguration.setProperty(key, value);
         return this;
     }
 
     @Override
-    public Future<ComputerResult> submit() {
+    protected void validateStatePriorToExecution() {
         super.validateStatePriorToExecution();
+        if (this.sparkConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD) && this.sparkConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT))
+            this.logger.warn("Both " + Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD + " and " + Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT + " were specified, ignoring " + Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT);
+        if (this.sparkConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD) && this.sparkConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
+            this.logger.warn("Both " + Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD + " and " + Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT + " were specified, ignoring " + Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT);
+    }
+
+    @Override
+    public Future<ComputerResult> submit() {
+        this.validateStatePriorToExecution();
         // apache and hadoop configurations that are used throughout the graph computer computation
         final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
         apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
         final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
-        if (null == hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null)) {
-            if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
-                try {
-                    final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-                    apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
-                    hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
-                } catch (final IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
+        if (hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, null) == null && // if an InputRDD is specified, then ignore InputFormat
+                hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, null) != null &&
+                FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+            try {
+                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+                apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+                hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
             }
-        } // else WARN that both an INPUT_FORMAT and INPUT_RDD_NAME were provided?
+        }
 
         // create the completable future
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
@@ -111,12 +119,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             SparkMemory memory = null;
             // delete output location
             final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
-            if (null != outputLocation) {
-                try {
+            try {
+                if (null != outputLocation && FileSystem.get(hadoopConfiguration).exists(new Path(outputLocation)))
                     FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
-                } catch (final IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
             }
             // wire up a spark context
             final SparkConf sparkConfiguration = new SparkConf();
@@ -132,21 +139,14 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 this.loadJars(sparkContext, hadoopConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 final JavaPairRDD<Object, VertexWritable> graphRDD;
-                if (null != sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null)) {
-                    if (!SparkHelper.getPersistedRDD(sparkContext, sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME)).isPresent())
-                        throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null));
-                    final JavaRDD rdd = SparkHelper.getPersistedRDD(sparkContext, sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME)).get().toJavaRDD();
-                    graphRDD = JavaPairRDD.fromJavaRDD(rdd).cache();
-                } else {
-                    try {
-                        graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
-                                .newInstance()
-                                .readGraphRDD(apacheConfiguration, sparkContext)
-                                .setName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "graphRDD"))
-                                .cache();
-                    } catch (final InstantiationException | IllegalAccessException e) {
-                        throw new IllegalStateException(e.getMessage(), e);
-                    }
+                try {
+                    graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+                            .newInstance()
+                            .readGraphRDD(apacheConfiguration, sparkContext)
+                            .setName(sparkConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "graphRDD"))
+                            .cache();
+                } catch (final InstantiationException | IllegalAccessException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
                 }
 
                 JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
@@ -176,7 +176,9 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         }
                     }
                     // write the graph rdd using the output rdd
-                    if (!this.persist.equals(GraphComputer.Persist.NOTHING)) {
+                    if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
+                            hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) &&
+                            !this.persist.equals(GraphComputer.Persist.NOTHING)) {
                         try {
                             hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
                                     .newInstance()
@@ -201,7 +203,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         mapReduce.storeState(newApacheConfiguration);
                         // map
                         final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
-                        // combine TODO: is this really needed
+                        // combine TODO: is this really needed?
                         // reduce
                         final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
                         // write the map reduce output back to disk (memory)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
new file mode 100644
index 0000000..ad521b3
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import scala.Tuple2;
+import scala.collection.Iterator;
+
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final String inputRDDName = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, null);
+        if (null == inputRDDName)
+            throw new IllegalArgumentException(PersistedInputRDD.class.getSimpleName() + " requires " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " in order to retrieve the named graphRDD from the SparkContext");
+        if (!PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).isPresent())
+            throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + inputRDDName);
+        return JavaPairRDD.fromJavaRDD((JavaRDD) PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).get().toJavaRDD());
+    }
+
+    public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
+        final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
+                getPersistentRDDs().
+                toList().iterator();
+        while (iterator.hasNext()) {
+            final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
+            if (tuple2._2().toString().contains(rddName))
+                return Optional.of(tuple2._2());
+        }
+        return Optional.empty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
new file mode 100644
index 0000000..1832ca3
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedOutputRDD implements OutputRDD {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
+
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+            LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java
deleted file mode 100644
index da57fb0..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.util;
-
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.rdd.RDD;
-import scala.Tuple2;
-import scala.collection.Iterator;
-
-import java.util.Optional;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkHelper {
-
-    private SparkHelper() {
-
-    }
-
-    public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
-        final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
-                getPersistentRDDs().
-                toList().iterator();
-        while (iterator.hasNext()) {
-            final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
-            if (tuple2._2().toString().contains(rddName))
-                return Optional.of(tuple2._2());
-        }
-        return Optional.empty();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
new file mode 100644
index 0000000..332d80d
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PersistedInputOutputRDDTest {
+
+    @Test
+    public void shouldNotPersistRDDAcrossJobs() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);  // because the spark context is NOT persisted, neither is the RDD
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("shouldNotPersistRDDAcrossJobs");
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+    }
+
+    @Test
+    public void shouldPersistRDDAcrossJobs() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertTrue(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ///////
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+        graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.NOTHING)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+
+    @Test
+    public void testBulkLoaderVertexProgramChain() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID().toString();
+        final Configuration readConfiguration = new BaseConfiguration();
+        readConfiguration.setProperty("spark.master", "local[4]");
+        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph pageRankGraph = GraphFactory.open(readConfiguration);
+        ///////////////
+        final Configuration writeConfiguration = new BaseConfiguration();
+        writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
+        bulkLoaderGraph.compute(SparkGraphComputer.class)
+                .persist(GraphComputer.Persist.NOTHING)
+                .workers(1)
+                .configure(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName())
+                .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName)
+                .configure(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)
+                .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null)
+                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
+                .submit().get();
+        ////
+        final Graph graph = TinkerGraph.open();
+        final GraphTraversalSource g = graph.traversal();
+        graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+
+        assertEquals(6l, g.V().count().next().longValue());
+        assertEquals(6l, g.E().count().next().longValue());
+        assertEquals("marko", g.V().has("name", "marko").values("name").next());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
deleted file mode 100644
index cc0957f..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.IoCore;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class SparkContextPersistenceTest {
-
-    @Test
-    public void shouldPersistRDDAcrossJobs() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
-        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.NOTHING)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-        ////////
-        SparkConf sparkConfiguration = new SparkConf();
-        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
-        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-        assertTrue(SparkHelper.getPersistedRDD(sparkContext, "a-random-name-for-testing").isPresent());
-        ///////
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing");
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null);
-        graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.NOTHING)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-
-    @Test
-    public void testBulkLoaderVertexProgramChain() throws Exception {
-        final Configuration readConfiguration = new BaseConfiguration();
-        readConfiguration.setProperty("spark.master", "local[4]");
-        readConfiguration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
-        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph graph = GraphFactory.open(readConfiguration);
-
-        ///////////////
-        final Configuration writeConfiguration = new BaseConfiguration();
-        writeConfiguration.setProperty(Graph.GRAPH, "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph");
-        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
-        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
-        final Graph secondGraph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().create(graph)).submit().get().graph();
-        secondGraph.compute(SparkGraphComputer.class)
-                .persist(GraphComputer.Persist.NOTHING)
-                .workers(1)
-                .config(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing")
-                .config(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null)
-                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(secondGraph))
-                .submit().get();
-        final Graph finalGraph = TinkerGraph.open();
-        finalGraph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
-        assertEquals(6l, finalGraph.traversal().V().count().next().longValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 7092f99..07ad0c8 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -92,11 +92,6 @@ public final class TinkerGraphComputer implements GraphComputer {
     }
 
     @Override
-    public GraphComputer config(final String key, final Object value) {
-        return this;
-    }
-
-    @Override
     public Future<ComputerResult> submit() {
         // a graph computer can only be executed once
         if (this.executed)