You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/12 20:01:17 UTC

incubator-tinkerpop git commit: HadoopGraph is smart about toString() with RDDs. Did some cleanup/simplification to SparkGraphComputer. Made it so we don't get the WARN messages for having both RDD and Formats declared in test suite by updated the SparkH

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1073 [created] ac08360e7


HadoopGraph is smart about toString() with RDDs. Did some cleanup/simplification to SparkGraphComputer. Made it so we don't get the WARN messages for having both RDD and Formats declared in test suite by updated the SparkHadoopGraphProvider. Fixed up a bunch of awkward issues around double declaration of input_format and input_rdd. I think in the future (3.2.0) we should just have ONE declaration -- InputFormat that is for both RDDs and Formats.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ac08360e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ac08360e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ac08360e

Branch: refs/heads/TINKERPOP-1073
Commit: ac08360e72be41a0519ab1da0dace9c628d4f040
Parents: 1d36fea
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 12 12:01:05 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 12 12:01:05 2016 -0700

----------------------------------------------------------------------
 .../computer/AbstractHadoopGraphComputer.java   | 28 ++++++------
 .../hadoop/structure/HadoopConfiguration.java   |  4 +-
 .../gremlin/hadoop/structure/HadoopGraph.java   | 19 +++++---
 .../gremlin/hadoop/HadoopGraphProvider.java     |  1 +
 .../hadoop/structure/HadoopGraphTest.java       | 48 ++++++++++++++++++++
 .../process/computer/SparkGraphComputer.java    | 15 +++---
 .../spark/structure/io/InputOutputHelper.java   | 10 ++--
 .../spark/structure/io/PersistedOutputRDD.java  |  9 +++-
 .../computer/SparkHadoopGraphProvider.java      |  6 ++-
 .../spark/structure/io/ToyGraphInputRDD.java    | 12 ++---
 10 files changed, 112 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index 1553240..02d23f6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -18,13 +18,8 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer;
 
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
@@ -161,18 +156,21 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
 
         @Override
         public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
-            if (hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT)) {
-                final OutputFormat<NullWritable, VertexWritable> outputFormat = ReflectionUtils.newInstance(hadoopGraph.configuration().getGraphOutputFormat(), ConfUtil.makeHadoopConfiguration(hadoopGraph.configuration()));
-                if (outputFormat instanceof PersistResultGraphAware)
-                    return ((PersistResultGraphAware) outputFormat).supportsResultGraphPersistCombination(resultGraph, persist);
-                else {
-                    logger.warn(outputFormat.getClass() + " does not implement " + PersistResultGraphAware.class.getSimpleName() + " and thus, persistence options are unknown -- assuming all options are possible");
-                    return true;
+            final String outputClass = hadoopGraph.configuration().containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD) ?
+                    hadoopGraph.configuration().getString(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD) :
+                    hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT) ?
+                            hadoopGraph.configuration().getString(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT) : null;
+            if (null != outputClass) {
+                try {
+                    final Object outputObject = Class.forName(outputClass).newInstance();
+                    if (outputObject instanceof PersistResultGraphAware)
+                        return ((PersistResultGraphAware) outputObject).supportsResultGraphPersistCombination(resultGraph, persist);
+                } catch (final Exception e) {
+                    // do nothing -- unknown output class so assume everything works
                 }
-            } else {
-                logger.warn("Unknown OutputFormat class and thus, persistence options are unknown -- assuming all options are possible");
-                return true;
             }
+            logger.warn("Unknown output class and thus, persistence options are unknown -- assuming all options are possible");
+            return true;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index 244ead9..2453829 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -84,7 +84,9 @@ public final class HadoopConfiguration extends AbstractConfiguration implements
 
     public Class<InputFormat<NullWritable, VertexWritable>> getGraphInputFormat() {
         try {
-            return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT));
+            return this.containsKey(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD) ?
+                    (Class) Class.forName("org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat") :
+                    (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT));
         } catch (final ClassNotFoundException e) {
             throw new RuntimeException(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 22f42f4..7273a6e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -235,14 +235,21 @@ public final class HadoopGraph implements Graph {
         return this.configuration;
     }
 
+    @Override
     public String toString() {
         final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.configuration);
-        final String fromString = this.configuration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT) ?
-                hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class).getSimpleName() :
-                "no-input";
-        final String toString = this.configuration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT) ?
-                hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class).getSimpleName() :
-                "no-output";
+        final String fromString =
+                this.configuration.containsKey(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD) ?
+                        hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, Object.class).getSimpleName() :
+                        this.configuration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT) ?
+                                hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class).getSimpleName() :
+                                "no-input";
+        final String toString =
+                this.configuration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD) ?
+                        hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, Object.class).getSimpleName() :
+                        this.configuration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT) ?
+                                hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class).getSimpleName() :
+                                "no-output";
         return StringFactory.graphString(this, fromString.toLowerCase() + "->" + toString.toLowerCase());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 90f5132..2e79bad 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -108,6 +108,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
             put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
             put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory());
             put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+            if (null != loadGraphWith) put("gremlin.hadoop.loadGraphWith", loadGraphWith.name().toLowerCase());
         }};
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphTest.java
new file mode 100644
index 0000000..16ded44
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class HadoopGraphTest {
+
+    @Test
+    public void shouldHaveProperToString() {
+        // the overwrites are what important not that these are legal RDD and Format classes
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GraphSONOutputFormat.class.getCanonicalName());
+        assertTrue(HadoopGraph.open(configuration).toString().contains((GryoInputFormat.class.getSimpleName() + "->" + GraphSONOutputFormat.class.getSimpleName()).toLowerCase()));
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, String.class.getCanonicalName());
+        assertTrue(HadoopGraph.open(configuration).toString().contains((String.class.getSimpleName() + "->" + GraphSONOutputFormat.class.getSimpleName()).toLowerCase()));
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, Long.class.getCanonicalName());
+        assertTrue(HadoopGraph.open(configuration).toString().contains((String.class.getSimpleName() + "->" + Long.class.getSimpleName()).toLowerCase()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index a87f95f..40cc923 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -55,6 +55,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
 import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
 
@@ -107,7 +108,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
     @Override
     public Future<ComputerResult> submit() {
         this.validateStatePriorToExecution();
-
         return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "SparkSubmitter");
     }
 
@@ -194,7 +194,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         }
                     }
                     // write the graph rdd using the output rdd
-                    final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
+                    final String[] elementComputeKeys = this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
                     graphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys);
                     if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
                             hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) &&
@@ -244,12 +244,15 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // unpersist the graphRDD if it will no longer be used
                 if (!PersistedOutputRDD.class.equals(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)) || this.persist.equals(GraphComputer.Persist.NOTHING)) {
                     graphRDD.unpersist();
-                    if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
+                }
+                // remove all output rdds and files if they will no longer be used
+                if (this.persist.equals(GraphComputer.Persist.NOTHING) && apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)) {
+                    if (PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedInputRDD.class)))
                         SparkContextStorage.open().rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
+                    // delete any file system output if persist nothing
+                    if (FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, FileInputFormat.class)))
+                        FileSystemStorage.open(hadoopConfiguration).rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
                 }
-                // delete any file system output if persist nothing
-                if (FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, FileInputFormat.class)) && this.persist.equals(GraphComputer.Persist.NOTHING))
-                    FileSystemStorage.open(hadoopConfiguration).rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                 return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
index 4753028..0beb748 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
@@ -67,10 +67,12 @@ public final class InputOutputHelper {
             final HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(configuration);
             final BaseConfiguration newConfiguration = new BaseConfiguration();
             newConfiguration.copy(org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper.getOutputGraph(configuration, resultGraph, persist).configuration());
-            if (resultGraph.equals(GraphComputer.ResultGraph.NEW) && hadoopConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD)) {
-                newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
-                //newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputRDDFormat.class.getCanonicalName());
-                newConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputOutputHelper.getInputFormat((Class) Class.forName(hadoopConfiguration.getString(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD))).getCanonicalName());
+            if (resultGraph.equals(GraphComputer.ResultGraph.NEW)) {
+                if (newConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD)) {
+                    newConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputOutputHelper.getInputFormat((Class) Class.forName(hadoopConfiguration.getString(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD))).getCanonicalName());
+                } else {
+                    newConfiguration.clearProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD);
+                }
             }
             return HadoopGraph.open(newConfiguration);
         } catch (final ClassNotFoundException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
index 4ae6248..b1b8b46 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -23,7 +23,9 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.PersistResultGraphAware;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -35,7 +37,7 @@ import java.util.Iterator;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class PersistedOutputRDD implements OutputRDD {
+public final class PersistedOutputRDD implements OutputRDD, PersistResultGraphAware {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
 
@@ -70,4 +72,9 @@ public final class PersistedOutputRDD implements OutputRDD {
         Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
         return IteratorUtils.map(memoryRDD.toLocalIterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
     }
+
+    @Override
+    public boolean supportsResultGraphPersistCombination(final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 108d0ed..c09185b 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -25,8 +25,9 @@ import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
 import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -48,12 +49,15 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider {
         config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);  // this makes the test suite go really fast
         if (!test.equals(FileSystemStorageCheck.class) && null != loadGraphWith && RANDOM.nextBoolean()) {
             config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName());
+            config.remove(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT);
         }
 
         // tests persisted RDDs
         if (test.equals(SparkContextStorageCheck.class)) {
             config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName());
             config.put(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+            config.remove(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT);
+            config.remove(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT);
         }
         /// spark configuration
         config.put("spark.master", "local[4]");

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
index ea3636f..677e6d0 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
@@ -22,7 +22,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -46,15 +45,16 @@ public final class ToyGraphInputRDD implements InputRDD {
 
     @Override
     public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final String loadGraphWith = configuration.getString("gremlin.hadoop.loadGraphWith", "nothing");
         HadoopPools.initialize(TinkerGraph.open().configuration());
         final List<VertexWritable> vertices;
-        if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("modern"))
+        if (loadGraphWith.contains("modern"))
             vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new));
-        else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("classic"))
+        else if (loadGraphWith.contains("classic"))
             vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(), VertexWritable::new));
-        else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("crew"))
+        else if (loadGraphWith.contains("crew"))
             vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(), VertexWritable::new));
-        else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("grateful")) {
+        else if (loadGraphWith.contains("grateful")) {
             try {
                 final Graph graph = TinkerGraph.open();
                 final GraphReader reader = GryoReader.build().mapper(graph.io(GryoIo.build()).mapper().create()).create();
@@ -66,7 +66,7 @@ public final class ToyGraphInputRDD implements InputRDD {
                 throw new IllegalStateException(e.getMessage(), e);
             }
         } else
-            throw new IllegalArgumentException("No legal toy graph was provided to load: " + configuration.getProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
+            throw new IllegalArgumentException("No legal toy graph was provided to load: " + loadGraphWith);
 
         return sparkContext.parallelize(vertices).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
     }