You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/11/02 16:52:03 UTC

[01/12] incubator-tinkerpop git commit: added SparkHelper to grab RDDs from the SparkContext.getPersistedRDDs(). A simple test case proves it works. A more involved test case using BulkLoaderVertexProgram is needed.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 19b7ae0fc -> aadd42212


added SparkHelper to grab RDDs from the SparkContext.getPersistedRDDs(). A simple test case proves it works. A more involved test case using BulkLoaderVertexProgram is needed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/82bbc59e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/82bbc59e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/82bbc59e

Branch: refs/heads/master
Commit: 82bbc59e676a49ccafe54567735b320df84d60f7
Parents: 9d8da87
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Oct 27 15:03:55 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Oct 27 15:03:55 2015 -0600

----------------------------------------------------------------------
 .../tinkerpop/gremlin/hadoop/Constants.java     |   2 +
 .../process/computer/SparkGraphComputer.java    |  29 +++--
 .../process/computer/util/SparkHelper.java      |  49 +++++++++
 .../io/SparkContextPersistenceTest.java         | 107 +++++++++++++++++++
 4 files changed, 178 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/82bbc59e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 48c5e37..7264001 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -52,5 +52,7 @@ public final class Constants {
     public static final String GREMLIN_SPARK_GRAPH_INPUT_RDD = "gremlin.spark.graphInputRDD";
     public static final String GREMLIN_SPARK_GRAPH_OUTPUT_RDD = "gremlin.spark.graphOutputRDD";
     public static final String GREMLIN_SPARK_PERSIST_CONTEXT = "gremlin.spark.persistContext";
+    public static final String GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME = "gremlin.spark.graphInputRDD.name";
+    public static final String GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME = "gremlin.spark.graphOutputRDD.name";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/82bbc59e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 7cfb176..3a5a8a7 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
@@ -49,6 +50,7 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
 
 import java.io.File;
 import java.io.IOException;
@@ -118,15 +120,23 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 this.loadJars(sparkContext, hadoopConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 final JavaPairRDD<Object, VertexWritable> graphRDD;
-                try {
-                    graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
-                            .newInstance()
-                            .readGraphRDD(apacheConfiguration, sparkContext)
-                            .setName("graphRDD")
-                            .cache();
-                } catch (final InstantiationException | IllegalAccessException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
+                if (null != sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null)) {
+                    if (!SparkHelper.getPersistedRDD(sparkContext, sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME)).isPresent())
+                        throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null));
+                    final JavaRDD rdd = SparkHelper.getPersistedRDD(sparkContext, sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME)).get().toJavaRDD();
+                    graphRDD = JavaPairRDD.fromJavaRDD(rdd).cache();
+                } else {
+                    try {
+                        graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+                                .newInstance()
+                                .readGraphRDD(apacheConfiguration, sparkContext)
+                                .setName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "graphRDD"))
+                                .cache();
+                    } catch (final InstantiationException | IllegalAccessException e) {
+                        throw new IllegalStateException(e.getMessage(), e);
+                    }
                 }
+
                 JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
 
                 ////////////////////////////////
@@ -186,11 +196,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                     }
                 }
+
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                 return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
             } finally {
-                if (sparkContext != null && !hadoopGraph.configuration().getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+                if (sparkContext != null && !this.hadoopGraph.configuration().getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
                     sparkContext.stop();
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/82bbc59e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java
new file mode 100644
index 0000000..da57fb0
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.util;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import scala.Tuple2;
+import scala.collection.Iterator;
+
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkHelper {
+
+    private SparkHelper() {
+
+    }
+
+    public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
+        final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
+                getPersistentRDDs().
+                toList().iterator();
+        while (iterator.hasNext()) {
+            final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
+            if (tuple2._2().toString().contains(rddName))
+                return Optional.of(tuple2._2());
+        }
+        return Optional.empty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/82bbc59e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
new file mode 100644
index 0000000..87bbf2b
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkContextPersistenceTest {
+
+    @Test
+    public void shouldPersistRDDAcrossJobs() throws Exception {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.NOTHING)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertTrue(SparkHelper.getPersistedRDD(sparkContext, "a-random-name-for-testing").isPresent());
+        ///////
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing");
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null);
+        graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.NOTHING)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+
+    @Test
+    public void testBulkLoaderVertexProgramChain() throws Exception {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph graph = GraphFactory.open(configuration);
+
+        //graph.compute().program(PageRankVertexProgram.build().create(graph)).submit().get().graph().compute().workers(1).program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(conf).create(graph)).submit().get()
+    }
+}


[09/12] incubator-tinkerpop git commit: Testing to make sure InputOutputHelper translator works for registered RDDs.

Posted by ok...@apache.org.
Testing to make sure InputOutputHelper translator works for registered RDDs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/cf0c4562
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/cf0c4562
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/cf0c4562

Branch: refs/heads/master
Commit: cf0c4562d4b992343080e9b5cb5f38fae33c0c3e
Parents: cb97238
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Oct 30 14:37:43 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Oct 30 14:37:43 2015 -0600

----------------------------------------------------------------------
 .../gremlin/spark/structure/io/InputOutputHelper.java  |  2 +-
 .../gremlin/spark/structure/io/ExampleInputRDD.java    | 13 ++++++++-----
 .../gremlin/spark/structure/io/ExampleOutputRDD.java   |  6 +++++-
 3 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cf0c4562/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
index b1e1b0a..1e279de 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
@@ -68,7 +68,7 @@ public final class InputOutputHelper {
             final BaseConfiguration newConfiguration = new BaseConfiguration();
             newConfiguration.copy(org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper.getOutputGraph(configuration, resultGraph, persist).configuration());
             if (resultGraph.equals(GraphComputer.ResultGraph.NEW) && hadoopConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD)) {
-                newConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputOutputHelper.getInputFormat((Class) Class.forName(hadoopConfiguration.getProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD).toString())).getCanonicalName());
+                newConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputOutputHelper.getInputFormat((Class) Class.forName(hadoopConfiguration.getString(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD))).getCanonicalName());
                 if (newConfiguration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, "").endsWith("/" + Constants.HIDDEN_G)) {  // Spark RDDs are not namespaced the same as Hadoop
                     newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, newConfiguration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).replace("/" + Constants.HIDDEN_G, ""));
                 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cf0c4562/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
index 8417d1c..86c7610 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
@@ -22,7 +22,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
@@ -36,13 +35,17 @@ import java.util.List;
  */
 public final class ExampleInputRDD implements InputRDD {
 
+    static {
+        InputOutputHelper.registerInputOutputPair(ExampleInputRDD.class, ExampleOutputRDD.class);
+    }
+
     @Override
     public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
         final List<Vertex> list = new ArrayList<>();
-        list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
-        list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
-        list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
-        list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
+        list.add(StarGraph.open().addVertex(T.id, 1l, T.label, "person", "age", 29));
+        list.add(StarGraph.open().addVertex(T.id, 2l, T.label, "person", "age", 27));
+        list.add(StarGraph.open().addVertex(T.id, 4l, T.label, "person", "age", 32));
+        list.add(StarGraph.open().addVertex(T.id, 6l, T.label, "person", "age", 35));
         return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cf0c4562/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
index 9a9e5ef..103ec20 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
 import java.util.Iterator;
@@ -32,6 +31,11 @@ import static org.junit.Assert.assertEquals;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class ExampleOutputRDD implements OutputRDD {
+
+    static {
+        InputOutputHelper.registerInputOutputPair(ExampleInputRDD.class, ExampleOutputRDD.class);
+    }
+
     @Override
     public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
         int totalAge = 0;


[03/12] incubator-tinkerpop git commit: merged master/.

Posted by ok...@apache.org.
merged master/.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/16b50052
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/16b50052
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/16b50052

Branch: refs/heads/master
Commit: 16b50052eb27ebb365341dc3b6e90608b07a71e6
Parents: 3d2d6a6 9300485
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Oct 29 17:29:02 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Oct 29 17:29:02 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   4 +
 CONTRIBUTING.asciidoc                           | 246 +----------------
 RELEASE.asciidoc                                | 172 ------------
 docs/src/developer-contributing.asciidoc        | 272 +++++++++++++++++++
 docs/src/developer-meetings.asciidoc            |  64 +++++
 docs/src/developer-release.asciidoc             | 171 ++++++++++++
 docs/src/developer.asciidoc                     |  31 +++
 docs/src/gremlin-applications.asciidoc          |   3 +-
 docs/src/the-traversal.asciidoc                 |  25 ++
 .../upgrade-release-3.1.x-incubating.asciidoc   |  65 ++++-
 docs/static/images/gremlin-standing-strong.png  | Bin 0 -> 30985 bytes
 .../process/computer/GiraphGraphComputer.java   |   7 +
 .../GephiTraversalVisualizationStrategy.groovy  |   2 +-
 .../gremlin/process/computer/GraphComputer.java |  16 +-
 .../traversal/TraversalVertexProgram.java       |   4 +-
 .../traversal/dsl/graph/GraphTraversal.java     |   7 +-
 .../dsl/graph/GraphTraversalSource.java         |  10 +-
 .../gremlin/process/traversal/dsl/graph/__.java |   9 +-
 .../process/traversal/step/map/GraphStep.java   | 149 ++++++++++
 .../traversal/step/sideEffect/GraphStep.java    | 101 -------
 .../step/util/TraversalComparator.java          |   5 +-
 .../strategy/decoration/ConnectiveStrategy.java |   3 +-
 .../strategy/decoration/ElementIdStrategy.java  |   2 +-
 .../strategy/decoration/PartitionStrategy.java  |   2 +-
 .../strategy/decoration/SubgraphStrategy.java   |   2 +-
 .../finalization/LazyBarrierStrategy.java       |   2 +-
 .../ComputerVerificationStrategy.java           |  39 ++-
 .../process/traversal/util/TraversalHelper.java |   3 +
 .../traversal/step/map/GraphStepTest.java       |  41 +++
 .../ElementIdStrategyTraverseTest.java          |   4 +-
 .../PartitionStrategyTraverseTest.java          |   4 +-
 .../SubgraphStrategyTraverseTest.java           |   5 +-
 .../traversal/step/map/GroovyGraphTest.groovy   |  50 ++++
 .../traversal/step/map/GroovyOrderTest.groovy   |  10 +
 .../process/GroovyProcessComputerSuite.java     |   2 +
 .../process/GroovyProcessStandardSuite.java     |   2 +
 .../handler/NioGremlinBinaryRequestDecoder.java |   5 +
 .../handler/WsGremlinBinaryRequestDecoder.java  |   5 +
 .../handler/WsGremlinCloseRequestDecoder.java   |   5 +
 .../handler/WsGremlinTextRequestDecoder.java    |   5 +
 .../server/GremlinDriverIntegrateTest.java      |  72 ++++-
 .../server/GremlinServerIntegrateTest.java      | 100 +++++++
 .../gremlin/process/ProcessComputerSuite.java   |   2 +
 .../gremlin/process/ProcessStandardSuite.java   |   2 +
 .../process/computer/GraphComputerTest.java     |   5 +
 .../traversal/step/map/AddVertexTest.java       |   2 +-
 .../process/traversal/step/map/GraphTest.java   | 109 ++++++++
 .../process/traversal/step/map/OrderTest.java   |  58 +++-
 .../step/sideEffect/Neo4jGraphStep.java         |  10 +-
 .../optimization/Neo4jGraphStepStrategy.java    |  13 +-
 .../gremlin/neo4j/structure/Neo4jGraph.java     |   1 +
 .../traversal/strategy/Neo4jStrategySuite.java  |  44 +++
 .../traversal/strategy/Neo4jStrategyTest.java   |  32 +++
 .../Neo4jGraphStepStrategyTest.java             |  76 ++++++
 pom.xml                                         |  34 ++-
 .../process/computer/SparkGraphComputer.java    |  16 +-
 .../io/SparkContextPersistenceTest.java         |  14 +-
 .../process/computer/TinkerGraphComputer.java   |   5 +
 .../step/sideEffect/TinkerGraphStep.java        |  10 +-
 .../optimization/TinkerGraphStepStrategy.java   |  13 +-
 .../tinkergraph/structure/TinkerFactory.java    |   8 +-
 .../tinkergraph/structure/TinkerGraph.java      |  59 +++-
 .../tinkergraph/TinkerGraphProvider.java        |  14 +-
 .../strategy/TinkerGraphStrategySuite.java      |  44 +++
 .../strategy/TinkerGraphStrategyTest.java       |  32 +++
 .../TinkerGraphStepStrategyTest.java            |  33 ++-
 .../structure/TinkerGraphIdManagerTest.java     |  18 +-
 .../structure/TinkerGraphPlayTest.java          |  11 +
 .../tinkergraph/structure/TinkerGraphTest.java  |  16 +-
 69 files changed, 1751 insertions(+), 656 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/16b50052/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --cc spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 7134f26,c3593c1..ef2ae6f
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@@ -80,20 -88,17 +90,20 @@@ public final class SparkGraphComputer e
      public Future<ComputerResult> submit() {
          super.validateStatePriorToExecution();
          // apache and hadoop configurations that are used throughout the graph computer computation
-         final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
 -        this.sparkConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
 -        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.sparkConfiguration);
 -        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
 -            try {
 -                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
 -                this.sparkConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
 -                hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
 -            } catch (final IOException e) {
 -                throw new IllegalStateException(e.getMessage(), e);
++        final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
 +        apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
 +        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
 +        if (null == hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null)) {
 +            if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
 +                try {
 +                    final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
 +                    apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
 +                    hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
 +                } catch (final IOException e) {
 +                    throw new IllegalStateException(e.getMessage(), e);
 +                }
              }
 -        }
 +        } // else WARN that both an INPUT_FORMAT and INPUT_RDD_NAME were provided?
  
          // create the completable future
          return CompletableFuture.<ComputerResult>supplyAsync(() -> {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/16b50052/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
----------------------------------------------------------------------
diff --cc spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
index e19d89f,0000000..cc0957f
mode 100644,000000..100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
@@@ -1,126 -1,0 +1,126 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.tinkerpop.gremlin.spark.process.computer.io;
 +
 +import org.apache.commons.configuration.BaseConfiguration;
 +import org.apache.commons.configuration.Configuration;
 +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 +import org.apache.spark.SparkConf;
 +import org.apache.spark.SparkContext;
 +import org.apache.spark.api.java.JavaSparkContext;
 +import org.apache.tinkerpop.gremlin.hadoop.Constants;
 +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 +import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
 +import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
 +import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 +import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
 +import org.apache.tinkerpop.gremlin.structure.Graph;
 +import org.apache.tinkerpop.gremlin.structure.io.IoCore;
- import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
 +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 +import org.junit.Test;
 +
- import static org.junit.Assert.*;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
 +
 +/**
 + * @author Marko A. Rodriguez (http://markorodriguez.com)
 + */
 +public class SparkContextPersistenceTest {
 +
 +    @Test
 +    public void shouldPersistRDDAcrossJobs() throws Exception {
 +        final Configuration configuration = new BaseConfiguration();
 +        configuration.setProperty("spark.master", "local[4]");
 +        configuration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
 +        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
 +        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
 +        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
 +        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
 +        Graph graph = GraphFactory.open(configuration);
 +        graph.compute(SparkGraphComputer.class)
 +                .result(GraphComputer.ResultGraph.NEW)
 +                .persist(GraphComputer.Persist.NOTHING)
 +                .program(TraversalVertexProgram.build()
 +                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
 +                                "gremlin-groovy",
 +                                "g.V()").create(graph)).submit().get();
 +        ////////
 +        SparkConf sparkConfiguration = new SparkConf();
 +        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
 +        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
 +        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
 +        assertTrue(SparkHelper.getPersistedRDD(sparkContext, "a-random-name-for-testing").isPresent());
 +        ///////
 +        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing");
 +        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null);
 +        graph = GraphFactory.open(configuration);
 +        graph.compute(SparkGraphComputer.class)
 +                .result(GraphComputer.ResultGraph.NEW)
 +                .persist(GraphComputer.Persist.NOTHING)
 +                .program(TraversalVertexProgram.build()
 +                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
 +                                "gremlin-groovy",
 +                                "g.V()").create(graph)).submit().get();
 +    }
 +
 +    @Test
 +    public void testBulkLoaderVertexProgramChain() throws Exception {
 +        final Configuration readConfiguration = new BaseConfiguration();
 +        readConfiguration.setProperty("spark.master", "local[4]");
 +        readConfiguration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
 +        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
 +        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
 +        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
 +        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
 +        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
 +        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
 +        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
 +        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
 +        Graph graph = GraphFactory.open(readConfiguration);
 +
 +        ///////////////
 +        final Configuration writeConfiguration = new BaseConfiguration();
 +        writeConfiguration.setProperty(Graph.GRAPH, "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph");
-         writeConfiguration.setProperty(TinkerGraph.CONFIG_GRAPH_FORMAT, "gryo");
-         writeConfiguration.setProperty(TinkerGraph.CONFIG_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
++        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
++        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
 +        final Graph secondGraph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().create(graph)).submit().get().graph();
-         secondGraph.configuration().setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing");
-         secondGraph.configuration().setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null);
 +        secondGraph.compute(SparkGraphComputer.class)
 +                .persist(GraphComputer.Persist.NOTHING)
 +                .workers(1)
++                .config(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing")
++                .config(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null)
 +                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(secondGraph))
 +                .submit().get();
 +        final Graph finalGraph = TinkerGraph.open();
 +        finalGraph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
-         assertEquals(6l,finalGraph.traversal().V().count().next().longValue());
++        assertEquals(6l, finalGraph.traversal().V().count().next().longValue());
 +    }
 +}


[06/12] incubator-tinkerpop git commit: forgot to remove this when figuring out how unpersist() works. Sorry for the slop.

Posted by ok...@apache.org.
forgot to remove this when figuring out how unpersist() works. Sorry for the slop.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/eeb0d9fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/eeb0d9fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/eeb0d9fa

Branch: refs/heads/master
Commit: eeb0d9fa7c975f0d7d3d820bba21f75211f3a129
Parents: 1f6c574
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Oct 30 12:37:39 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Oct 30 12:37:39 2015 -0600

----------------------------------------------------------------------
 .../gremlin/spark/process/computer/io/OutputFormatRDD.java          | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/eeb0d9fa/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
index 661dc51..56a1297 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
@@ -45,6 +45,5 @@ public final class OutputFormatRDD implements OutputRDD {
                             VertexWritable.class,
                             (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
         }
-        graphRDD.unpersist();
     }
 }
\ No newline at end of file


[04/12] incubator-tinkerpop git commit: This is a masterpiece here. PersistedXXXRDD is now a Spark RDD class where the inputLocation (outputLocation) are the names of the RDD. No HDFS is used between jobs as the graphRDD is stored in the SparkServer usin

Posted by ok...@apache.org.
This is a masterpiece here. PersistedXXXRDD is now a Spark RDD class where the inputLocation (outputLocation) are the names of the RDD. No HDFS is used between jobs as the graphRDD is stored in the SparkServer using a persisted context. Added test cases, renamed GraphComputer.config() to configure() to be consistent with the naming conventions of GraphComputer methods. Also made it default as most implementaitons won't need it and there is no point to require a random return this. Updated docs accordingly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/528ba027
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/528ba027
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/528ba027

Branch: refs/heads/master
Commit: 528ba027a098bc722211767b11b7dc010fb2cba1
Parents: 16b5005
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Oct 30 11:48:42 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Oct 30 11:48:42 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   3 +-
 docs/src/implementations.asciidoc               |  32 ++--
 docs/src/the-traversal.asciidoc                 |  11 +-
 .../upgrade-release-3.1.x-incubating.asciidoc   |   8 +
 .../process/computer/GiraphGraphComputer.java   |   2 +-
 .../gremlin/process/computer/GraphComputer.java |   5 +-
 .../process/computer/GraphComputerTest.java     |   2 +-
 .../tinkerpop/gremlin/hadoop/Constants.java     |   3 -
 .../process/computer/SparkGraphComputer.java    |  78 ++++-----
 .../process/computer/io/PersistedInputRDD.java  |  60 +++++++
 .../process/computer/io/PersistedOutputRDD.java |  41 +++++
 .../process/computer/util/SparkHelper.java      |  49 ------
 .../io/PersistedInputOutputRDDTest.java         | 168 +++++++++++++++++++
 .../io/SparkContextPersistenceTest.java         | 126 --------------
 .../process/computer/TinkerGraphComputer.java   |   5 -
 15 files changed, 347 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 8bd6e51..7ee21ad 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,8 +25,9 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Added `PersistedInputRDD` and `PersistedOutputRDD` which enables `SparkGraphComputer` to store the graph RDD in the context between jobs.
 * Renamed the `public static String` configuration variable names of TinkerGraph (deprecated old variables).
-* Added `GraphComputer.config(key,value)` to allow engine-specific configurations.
+* Added `GraphComputer.configure(key,value)` to allow engine-specific configurations.
 * `GraphStep` is no longer in the `sideEffect`-package and is now in `map`-package (breaking change).
 * Added suppport for mid-traversal `V()`-steps (`GraphStep` semantics updated).
 * Fixed `Number` handling in `Operator` enums. Prior this change a lot of operations on mixed `Number` types returned a wrong result (wrong data type).

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/docs/src/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/implementations.asciidoc b/docs/src/implementations.asciidoc
index 90ade3f..8110c5f 100644
--- a/docs/src/implementations.asciidoc
+++ b/docs/src/implementations.asciidoc
@@ -1196,11 +1196,16 @@ image::spark-algorithm.png[width=775]
 |gremlin.spark.persistContext |Whether to create a new `SparkContext` for every `SparkGraphComputer` or to reuse an existing one.
 |========================================================
 
-IMPORTANT: If the provider/user wishes to not use Hadoop `InputFormats`, it is possible to leverage Spark's RDD
+If the provider/user wishes to not use Hadoop `InputFormats`, it is possible to leverage Spark's RDD
 constructs directly. There is a `gremlin.spark.graphInputRDD` configuration that references a `Class<? extends
 InputRDD>`. An `InputRDD` provides a read method that takes a `SparkContext` and returns a graphRDD. Likewise, use
 `gremlin.spark.graphOutputRDD` and the respective `OutputRDD`.
 
+It is possible to persist the graph RDD between jobs within the `SparkContext` (e.g. SparkServer) by leveraging `PersistedOutputRDD`.
+Note that `gremlin.spark.persistContext` should be set to `true` or else the persisted RDD will be destroyed when the `SparkContext` closes.
+The persisted RDD is named by the `gremlin.hadoop.outputLocation` configuration (i.e. named in `SparkContext.getPersistedRDDs()`).
+Finally, `PersistedInputRDD` is used with respective  `gremlin.hadoop.inputLocation` to retrieve the persisted RDD from the `SparkContext`.
+
 Loading with BulkLoaderVertexProgram
 ++++++++++++++++++++++++++++++++++++
 
@@ -1211,14 +1216,14 @@ Grateful Dead graph from HadoopGraph into TinkerGraph over Spark:
 [gremlin-groovy]
 ----
 hdfs.copyFromLocal('data/grateful-dead.kryo', 'data/grateful-dead.kryo')
-wgConf = 'conf/tinkergraph-gryo.properties'
-grateful = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
+readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
+writeGraph = 'conf/tinkergraph-gryo.properties'
 blvp = BulkLoaderVertexProgram.build().
            keepOriginalIds(false).
-           writeGraph(wgConf).create(grateful)
-grateful.compute(SparkGraphComputer).workers(1).program(blvp).submit().get()
+           writeGraph(writeGraph).create(readGraph)
+readGraph.compute(SparkGraphComputer).workers(1).program(blvp).submit().get()
 :set max-iteration 10
-graph = GraphFactory.open(wgConf)
+graph = GraphFactory.open(writeGraph)
 g = graph.traversal()
 g.V().valueMap()
 graph.close()
@@ -1233,7 +1238,6 @@ graph.close()
 #
 gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
 gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
-gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
 gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
 gremlin.hadoop.inputLocation=data/grateful-dead.kryo
 gremlin.hadoop.outputLocation=output
@@ -1245,7 +1249,7 @@ gremlin.hadoop.jarsInDistributedCache=true
 #
 spark.master=local[1]
 spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
 ----
 
 [source,properties]
@@ -1257,7 +1261,7 @@ gremlin.tinkergraph.graphFormat=gryo
 gremlin.tinkergraph.graphLocation=/tmp/tinkergraph.kryo
 ----
 
-NOTE: The path to TinkerGraph needs to be included in the `HADOOP_GREMLIN_LIBS` for the above example to work.
+IMPORTANT: The path to TinkerGraph jars needs to be included in the `HADOOP_GREMLIN_LIBS` for the above example to work.
 
 [[giraphgraphcomputer]]
 GiraphGraphComputer
@@ -1341,14 +1345,14 @@ the Grateful Dead graph from HadoopGraph into TinkerGraph over Giraph:
 [gremlin-groovy]
 ----
 hdfs.copyFromLocal('data/grateful-dead.kryo', 'data/grateful-dead.kryo')
-wgConf = 'conf/tinkergraph-gryo.properties'
-grateful = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
+readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
+writeGraph = 'conf/tinkergraph-gryo.properties'
 blvp = BulkLoaderVertexProgram.build().
            keepOriginalIds(false).
-           writeGraph(wgConf).create(grateful)
-grateful.compute(GiraphGraphComputer).workers(1).program(blvp).submit().get()
+           writeGraph(writeGraph).create(readGraph)
+readGraph.compute(GiraphGraphComputer).workers(1).program(blvp).submit().get()
 :set max-iteration 10
-graph = GraphFactory.open(wgConf)
+graph = GraphFactory.open(writeGraph)
 g = graph.traversal()
 g.V().valueMap()
 graph.close()

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/docs/src/the-traversal.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/the-traversal.asciidoc b/docs/src/the-traversal.asciidoc
index 42365fa..74e859e 100644
--- a/docs/src/the-traversal.asciidoc
+++ b/docs/src/the-traversal.asciidoc
@@ -2174,12 +2174,9 @@ public final class TinkerGraphStepStrategy extends AbstractTraversalStrategy<Tra
         if (traversal.getEngine().isComputer())
             return;
 
-        final Step<?, ?> startStep = traversal.getStartStep();
-        if (startStep instanceof GraphStep) {
-            final GraphStep<?> originalGraphStep = (GraphStep) startStep;
-            final TinkerGraphStep<?> tinkerGraphStep = new TinkerGraphStep<>(originalGraphStep);
-            TraversalHelper.replaceStep(startStep, (Step) tinkerGraphStep, traversal);
-
+        TraversalHelper.getStepsOfClass(GraphStep.class, traversal).forEach(originalGraphStep -> {
+            final TinkerGraphStep<?,?> tinkerGraphStep = new TinkerGraphStep<>(originalGraphStep);
+            TraversalHelper.replaceStep(originalGraphStep, (Step) tinkerGraphStep, traversal);
             Step<?, ?> currentStep = tinkerGraphStep.getNextStep();
             while (currentStep instanceof HasContainerHolder) {
                 ((HasContainerHolder) currentStep).getHasContainers().forEach(tinkerGraphStep::addHasContainer);
@@ -2187,7 +2184,7 @@ public final class TinkerGraphStepStrategy extends AbstractTraversalStrategy<Tra
                 traversal.removeStep(currentStep);
                 currentStep = currentStep.getNextStep();
             }
-        }
+        });
     }
 
     public static TinkerGraphStepStrategy instance() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/docs/src/upgrade-release-3.1.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade-release-3.1.x-incubating.asciidoc b/docs/src/upgrade-release-3.1.x-incubating.asciidoc
index b50dd7e..77a448f 100644
--- a/docs/src/upgrade-release-3.1.x-incubating.asciidoc
+++ b/docs/src/upgrade-release-3.1.x-incubating.asciidoc
@@ -164,3 +164,11 @@ The `VendorOptimizationStrategy` has been renamed to `ProviderOptimizationStrate
 with revised terminology for what were formerly referred to as "vendors".
 
 See link:https://issues.apache.org/jira/browse/TINKERPOP3-876[TINKERPOP3-876] for more information.
+
+GraphComputer Updates
++++++++++++++++++++++
+
+`GraphComputer.configure(String key, Object value)` is now a method. This allows the user to specify engine-specific
+parameters to the underlying OLAP system. These parameters are not intended to be cross engine supported. Moreover, if
+there are not parameters that can be altered (beyond the standard `GraphComputer` methods), then the provider's `GraphComputer`
+implementation should simply return and do nothing.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 1e5dbd8..4c33220 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -93,7 +93,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     }
 
     @Override
-    public GraphComputer config(final String key, final Object value) {
+    public GraphComputer configure(final String key, final Object value) {
         this.giraphConfiguration.set(key, value.toString());
         this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
         return this;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
index bfb7bde..547af9e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -108,12 +108,15 @@ public interface GraphComputer {
      * Typically, the other fluent methods in {@link GraphComputer} should be used to configure the computation.
      * However, for some custom configuration in the underlying engine, this method should be used.
      * Different GraphComputer implementations will have different key/values and thus, parameters placed here are generally not universal to all GraphComputer implementations.
+     * The default implementation simply does nothing and returns the {@link GraphComputer} unchanged.
      *
      * @param key   the key of the configuration
      * @param value the value of the configuration
      * @return the updated GraphComputer with newly set key/value configuration
      */
-    public GraphComputer config(final String key, final Object value);
+    public default GraphComputer configure(final String key, final Object value) {
+        return this;
+    }
 
     /**
      * Submit the {@link VertexProgram} and the set of {@link MapReduce} jobs for execution by the {@link GraphComputer}.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 0026296..acbdc81 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -127,7 +127,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public GraphComputer config(final String key, final Object value) {
+        public GraphComputer configure(final String key, final Object value) {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 7264001..469e9b0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -52,7 +52,4 @@ public final class Constants {
     public static final String GREMLIN_SPARK_GRAPH_INPUT_RDD = "gremlin.spark.graphInputRDD";
     public static final String GREMLIN_SPARK_GRAPH_OUTPUT_RDD = "gremlin.spark.graphOutputRDD";
     public static final String GREMLIN_SPARK_PERSIST_CONTEXT = "gremlin.spark.persistContext";
-    public static final String GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME = "gremlin.spark.graphInputRDD.name";
-    public static final String GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME = "gremlin.spark.graphOutputRDD.name";
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index ef2ae6f..c9e95e3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.launcher.SparkLauncher;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
@@ -50,7 +50,6 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
 
 import java.io.File;
 import java.io.IOException;
@@ -74,36 +73,45 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
     @Override
     public GraphComputer workers(final int workers) {
         super.workers(workers);
-        if (this.sparkConfiguration.getString("spark.master").startsWith("local")) {
-            this.sparkConfiguration.setProperty("spark.master", "local[" + this.workers + "]");
+        if (this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
+            this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
         }
         return this;
     }
 
     @Override
-    public GraphComputer config(final String key, final Object value) {
+    public GraphComputer configure(final String key, final Object value) {
         this.sparkConfiguration.setProperty(key, value);
         return this;
     }
 
     @Override
-    public Future<ComputerResult> submit() {
+    protected void validateStatePriorToExecution() {
         super.validateStatePriorToExecution();
+        if (this.sparkConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD) && this.sparkConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT))
+            this.logger.warn("Both " + Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD + " and " + Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT + " were specified, ignoring " + Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT);
+        if (this.sparkConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD) && this.sparkConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
+            this.logger.warn("Both " + Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD + " and " + Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT + " were specified, ignoring " + Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT);
+    }
+
+    @Override
+    public Future<ComputerResult> submit() {
+        this.validateStatePriorToExecution();
         // apache and hadoop configurations that are used throughout the graph computer computation
         final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
         apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
         final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
-        if (null == hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null)) {
-            if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
-                try {
-                    final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-                    apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
-                    hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
-                } catch (final IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
+        if (hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, null) == null && // if an InputRDD is specified, then ignore InputFormat
+                hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, null) != null &&
+                FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+            try {
+                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+                apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+                hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
             }
-        } // else WARN that both an INPUT_FORMAT and INPUT_RDD_NAME were provided?
+        }
 
         // create the completable future
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
@@ -111,12 +119,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             SparkMemory memory = null;
             // delete output location
             final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
-            if (null != outputLocation) {
-                try {
+            try {
+                if (null != outputLocation && FileSystem.get(hadoopConfiguration).exists(new Path(outputLocation)))
                     FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
-                } catch (final IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
             }
             // wire up a spark context
             final SparkConf sparkConfiguration = new SparkConf();
@@ -132,21 +139,14 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 this.loadJars(sparkContext, hadoopConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 final JavaPairRDD<Object, VertexWritable> graphRDD;
-                if (null != sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null)) {
-                    if (!SparkHelper.getPersistedRDD(sparkContext, sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME)).isPresent())
-                        throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null));
-                    final JavaRDD rdd = SparkHelper.getPersistedRDD(sparkContext, sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME)).get().toJavaRDD();
-                    graphRDD = JavaPairRDD.fromJavaRDD(rdd).cache();
-                } else {
-                    try {
-                        graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
-                                .newInstance()
-                                .readGraphRDD(apacheConfiguration, sparkContext)
-                                .setName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "graphRDD"))
-                                .cache();
-                    } catch (final InstantiationException | IllegalAccessException e) {
-                        throw new IllegalStateException(e.getMessage(), e);
-                    }
+                try {
+                    graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+                            .newInstance()
+                            .readGraphRDD(apacheConfiguration, sparkContext)
+                            .setName(sparkConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "graphRDD"))
+                            .cache();
+                } catch (final InstantiationException | IllegalAccessException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
                 }
 
                 JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
@@ -176,7 +176,9 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         }
                     }
                     // write the graph rdd using the output rdd
-                    if (!this.persist.equals(GraphComputer.Persist.NOTHING)) {
+                    if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
+                            hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) &&
+                            !this.persist.equals(GraphComputer.Persist.NOTHING)) {
                         try {
                             hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
                                     .newInstance()
@@ -201,7 +203,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         mapReduce.storeState(newApacheConfiguration);
                         // map
                         final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
-                        // combine TODO: is this really needed
+                        // combine TODO: is this really needed?
                         // reduce
                         final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
                         // write the map reduce output back to disk (memory)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
new file mode 100644
index 0000000..ad521b3
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import scala.Tuple2;
+import scala.collection.Iterator;
+
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final String inputRDDName = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, null);
+        if (null == inputRDDName)
+            throw new IllegalArgumentException(PersistedInputRDD.class.getSimpleName() + " requires " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " in order to retrieve the named graphRDD from the SparkContext");
+        if (!PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).isPresent())
+            throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + inputRDDName);
+        return JavaPairRDD.fromJavaRDD((JavaRDD) PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).get().toJavaRDD());
+    }
+
+    public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
+        final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
+                getPersistentRDDs().
+                toList().iterator();
+        while (iterator.hasNext()) {
+            final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
+            if (tuple2._2().toString().contains(rddName))
+                return Optional.of(tuple2._2());
+        }
+        return Optional.empty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
new file mode 100644
index 0000000..1832ca3
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedOutputRDD implements OutputRDD {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
+
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+            LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java
deleted file mode 100644
index da57fb0..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/util/SparkHelper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.util;
-
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.rdd.RDD;
-import scala.Tuple2;
-import scala.collection.Iterator;
-
-import java.util.Optional;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkHelper {
-
-    private SparkHelper() {
-
-    }
-
-    public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
-        final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
-                getPersistentRDDs().
-                toList().iterator();
-        while (iterator.hasNext()) {
-            final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
-            if (tuple2._2().toString().contains(rddName))
-                return Optional.of(tuple2._2());
-        }
-        return Optional.empty();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
new file mode 100644
index 0000000..332d80d
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PersistedInputOutputRDDTest {
+
+    @Test
+    public void shouldNotPersistRDDAcrossJobs() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);  // because the spark context is NOT persisted, neither is the RDD
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("shouldNotPersistRDDAcrossJobs");
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+    }
+
+    @Test
+    public void shouldPersistRDDAcrossJobs() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertTrue(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ///////
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+        graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.NOTHING)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+
+    @Test
+    public void testBulkLoaderVertexProgramChain() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID().toString();
+        final Configuration readConfiguration = new BaseConfiguration();
+        readConfiguration.setProperty("spark.master", "local[4]");
+        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph pageRankGraph = GraphFactory.open(readConfiguration);
+        ///////////////
+        final Configuration writeConfiguration = new BaseConfiguration();
+        writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
+        bulkLoaderGraph.compute(SparkGraphComputer.class)
+                .persist(GraphComputer.Persist.NOTHING)
+                .workers(1)
+                .configure(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName())
+                .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName)
+                .configure(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)
+                .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null)
+                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
+                .submit().get();
+        ////
+        final Graph graph = TinkerGraph.open();
+        final GraphTraversalSource g = graph.traversal();
+        graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+
+        assertEquals(6l, g.V().count().next().longValue());
+        assertEquals(6l, g.E().count().next().longValue());
+        assertEquals("marko", g.V().has("name", "marko").values("name").next());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
deleted file mode 100644
index cc0957f..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.IoCore;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class SparkContextPersistenceTest {
-
-    @Test
-    public void shouldPersistRDDAcrossJobs() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
-        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.NOTHING)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-        ////////
-        SparkConf sparkConfiguration = new SparkConf();
-        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
-        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-        assertTrue(SparkHelper.getPersistedRDD(sparkContext, "a-random-name-for-testing").isPresent());
-        ///////
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing");
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null);
-        graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.NOTHING)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-
-    @Test
-    public void testBulkLoaderVertexProgramChain() throws Exception {
-        final Configuration readConfiguration = new BaseConfiguration();
-        readConfiguration.setProperty("spark.master", "local[4]");
-        readConfiguration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
-        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph graph = GraphFactory.open(readConfiguration);
-
-        ///////////////
-        final Configuration writeConfiguration = new BaseConfiguration();
-        writeConfiguration.setProperty(Graph.GRAPH, "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph");
-        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
-        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
-        final Graph secondGraph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().create(graph)).submit().get().graph();
-        secondGraph.compute(SparkGraphComputer.class)
-                .persist(GraphComputer.Persist.NOTHING)
-                .workers(1)
-                .config(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing")
-                .config(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null)
-                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(secondGraph))
-                .submit().get();
-        final Graph finalGraph = TinkerGraph.open();
-        finalGraph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
-        assertEquals(6l, finalGraph.traversal().V().count().next().longValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/528ba027/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 7092f99..07ad0c8 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -92,11 +92,6 @@ public final class TinkerGraphComputer implements GraphComputer {
     }
 
     @Override
-    public GraphComputer config(final String key, final Object value) {
-        return this;
-    }
-
-    @Override
     public Future<ComputerResult> submit() {
         // a graph computer can only be executed once
         if (this.executed)


[05/12] incubator-tinkerpop git commit: @RussellSpitzer explained how to unpersist RDDs works and recommended to drop the RDDs that are OutputFormatted (as they are in HDFS) or are not persisted with OutputRDD or Persist.NOTHING is set.

Posted by ok...@apache.org.
@RussellSpitzer explained how to unpersist RDDs works and recommended to drop the RDDs that are OutputFormatted (as they are in HDFS) or are not persisted with OutputRDD or Persist.NOTHING is set.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/1f6c5744
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/1f6c5744
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/1f6c5744

Branch: refs/heads/master
Commit: 1f6c57444e8547f0b69bf3e31494ba4ca6ca685d
Parents: 528ba02
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Oct 30 12:22:01 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Oct 30 12:22:01 2015 -0600

----------------------------------------------------------------------
 .../gremlin/spark/process/computer/SparkGraphComputer.java   | 4 ++++
 .../gremlin/spark/process/computer/io/OutputFormatRDD.java   | 1 +
 .../process/computer/io/PersistedInputOutputRDDTest.java     | 8 ++++++--
 3 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1f6c5744/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index c9e95e3..0656f5a 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -211,6 +211,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     }
                 }
 
+                // unpersist the graphRDD if it will no longer be used
+                if (hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) == null || this.persist.equals(GraphComputer.Persist.NOTHING)) {
+                    graphRDD.unpersist();
+                }
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                 return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1f6c5744/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
index 56a1297..661dc51 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
@@ -45,5 +45,6 @@ public final class OutputFormatRDD implements OutputRDD {
                             VertexWritable.class,
                             (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
         }
+        graphRDD.unpersist();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1f6c5744/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
index 332d80d..a7d1ac0 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
@@ -142,7 +142,7 @@ public class PersistedInputOutputRDDTest {
         writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
         writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
         writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
-        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
+        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
         bulkLoaderGraph.compute(SparkGraphComputer.class)
                 .persist(GraphComputer.Persist.NOTHING)
                 .workers(1)
@@ -153,10 +153,14 @@ public class PersistedInputOutputRDDTest {
                 .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
                 .submit().get();
         ////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ////
         final Graph graph = TinkerGraph.open();
         final GraphTraversalSource g = graph.traversal();
         graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
-
         assertEquals(6l, g.V().count().next().longValue());
         assertEquals(6l, g.E().count().next().longValue());
         assertEquals("marko", g.V().has("name", "marko").values("name").next());


[10/12] incubator-tinkerpop git commit: a small tweak to maintain the configuration as it was prior to Giraph getting its little hands on it.

Posted by ok...@apache.org.
a small tweak to maintain the configuration as it was prior to Giraph getting its little hands on it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f9d1fa72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f9d1fa72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f9d1fa72

Branch: refs/heads/master
Commit: f9d1fa726dd82d2ebee770d9d321425c91a6e268
Parents: cf0c456
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Oct 30 18:26:24 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Oct 30 18:26:24 2015 -0600

----------------------------------------------------------------------
 .../gremlin/giraph/process/computer/GiraphGraphComputer.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f9d1fa72/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 0389750..67e8ea3 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -114,6 +114,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
     public Future<ComputerResult> submit() {
         final long startTime = System.currentTimeMillis();
         super.validateStatePriorToExecution();
+        final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration);
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
             try {
                 final FileSystem fs = FileSystem.get(this.giraphConfiguration);
@@ -126,7 +127,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
             }
 
             this.memory.setRuntime(System.currentTimeMillis() - startTime);
-            return new DefaultComputerResult(InputOutputHelper.getOutputGraph(ConfUtil.makeApacheConfiguration(this.giraphConfiguration), this.resultGraph, this.persist), this.memory.asImmutable());
+            return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), this.memory.asImmutable());
         });
     }
 


[11/12] incubator-tinkerpop git commit: merged TINKERPOP3-925

Posted by ok...@apache.org.
merged TINKERPOP3-925


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0dc2dbce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0dc2dbce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0dc2dbce

Branch: refs/heads/master
Commit: 0dc2dbce055b827a675fd4535e4760a0f5539cee
Parents: 19b7ae0 f9d1fa7
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Nov 2 08:40:50 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Nov 2 08:40:50 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   3 +-
 docs/src/implementations.asciidoc               |  36 +--
 docs/src/the-traversal.asciidoc                 |  11 +-
 .../upgrade-release-3.1.x-incubating.asciidoc   |   8 +
 .../process/computer/GiraphGraphComputer.java   |  11 +-
 .../computer/io/GiraphVertexInputFormat.java    |  70 ------
 .../computer/io/GiraphVertexOutputFormat.java   |  65 ------
 .../process/computer/io/GiraphVertexReader.java |  67 ------
 .../process/computer/io/GiraphVertexWriter.java |  57 -----
 .../structure/io/GiraphVertexInputFormat.java   |  70 ++++++
 .../structure/io/GiraphVertexOutputFormat.java  |  65 ++++++
 .../giraph/structure/io/GiraphVertexReader.java |  67 ++++++
 .../giraph/structure/io/GiraphVertexWriter.java |  57 +++++
 .../gremlin/process/computer/GraphComputer.java |   5 +-
 .../process/computer/GraphComputerTest.java     |   2 +-
 .../tinkerpop/gremlin/hadoop/Constants.java     |   1 -
 .../hadoop/structure/io/InputOutputHelper.java  |  22 ++
 .../hadoop/structure/util/HadoopHelper.java     |  50 -----
 .../process/computer/SparkGraphComputer.java    |  75 ++++---
 .../process/computer/io/InputFormatRDD.java     |  47 ----
 .../spark/process/computer/io/InputRDD.java     |  41 ----
 .../process/computer/io/OutputFormatRDD.java    |  49 -----
 .../spark/process/computer/io/OutputRDD.java    |  31 ---
 .../spark/structure/io/InputFormatRDD.java      |  47 ++++
 .../spark/structure/io/InputOutputHelper.java   |  81 +++++++
 .../gremlin/spark/structure/io/InputRDD.java    |  41 ++++
 .../spark/structure/io/OutputFormatRDD.java     |  49 +++++
 .../gremlin/spark/structure/io/OutputRDD.java   |  31 +++
 .../spark/structure/io/PersistedInputRDD.java   |  60 +++++
 .../spark/structure/io/PersistedOutputRDD.java  |  41 ++++
 .../process/computer/io/ExampleInputRDD.java    |  47 ----
 .../process/computer/io/ExampleOutputRDD.java   |  45 ----
 .../process/computer/io/InputOutputRDDTest.java |  59 -----
 .../spark/process/computer/io/InputRDDTest.java |  54 -----
 .../process/computer/io/OutputRDDTest.java      |  62 ------
 .../spark/structure/io/ExampleInputRDD.java     |  51 +++++
 .../spark/structure/io/ExampleOutputRDD.java    |  50 +++++
 .../spark/structure/io/InputOutputRDDTest.java  |  59 +++++
 .../spark/structure/io/InputRDDTest.java        |  54 +++++
 .../spark/structure/io/OutputRDDTest.java       |  62 ++++++
 .../io/PersistedInputOutputRDDTest.java         | 217 +++++++++++++++++++
 .../process/computer/TinkerGraphComputer.java   |   5 -
 42 files changed, 1217 insertions(+), 808 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0dc2dbce/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --cc CHANGELOG.asciidoc
index 06d0db4,7ee21ad..e79a0a0
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@@ -25,11 -25,11 +25,12 @@@ image::https://raw.githubusercontent.co
  TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  
 -* Added `PersistedInputRDD` and `PersistedOutputRDD` which enables `SparkGraphComputer` to store the graph RDD in the context between jobs.
 +* Bumped to Neo4j 2.3.0.
++* Added `PersistedInputRDD` and `PersistedOutputRDD` which enables `SparkGraphComputer` to store the graph RDD in the context between jobs (no HDFS serialization required).
  * Renamed the `public static String` configuration variable names of TinkerGraph (deprecated old variables).
- * Added `GraphComputer.config(key,value)` to allow engine-specific configurations.
+ * Added `GraphComputer.configure(key,value)` to allow engine-specific configurations.
  * `GraphStep` is no longer in the `sideEffect`-package and is now in `map`-package (breaking change).
 -* Added suppport for mid-traversal `V()`-steps (`GraphStep` semantics updated).
 +* Added support for mid-traversal `V()`-steps (`GraphStep` semantics updated).
  * Fixed `Number` handling in `Operator` enums. Prior this change a lot of operations on mixed `Number` types returned a wrong result (wrong data type).
  * Fixed a bug in Gremlin Server/Driver serializer where empty buffers were getting returned in certain cases.
  * Renamed `ConjunctionX` to `ConnectiveX` because "conjunction" is assumed "and" (disjunction "or"), where "connective" is the parent concept.


[07/12] incubator-tinkerpop git commit: @RussellSpitzer has schooled me in cache()/unpersist(). I now am smart to unpersist() RDDs that are just dangling around (especailly since we now have persistent contexts). Finally, I did some refactoring of packag

Posted by ok...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
deleted file mode 100644
index b83314a..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputOutputRDDTest {
-
-    @Test
-    public void shouldReadFromWriteToArbitraryRDD() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
deleted file mode 100644
index 3070fea..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputRDDTest {
-
-    @Test
-    public void shouldReadFromArbitraryRDD() {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
-        assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
deleted file mode 100644
index febece6..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class OutputRDDTest {
-
-    @Test
-    public void shouldWriteToArbitraryRDD() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
deleted file mode 100644
index a7d1ac0..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.IoCore;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.junit.Test;
-
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class PersistedInputOutputRDDTest {
-
-    @Test
-    public void shouldNotPersistRDDAcrossJobs() throws Exception {
-        final String rddName = "target/test-output/" + UUID.randomUUID();
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);  // because the spark context is NOT persisted, neither is the RDD
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-        ////////
-        SparkConf sparkConfiguration = new SparkConf();
-        sparkConfiguration.setAppName("shouldNotPersistRDDAcrossJobs");
-        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
-    }
-
-    @Test
-    public void shouldPersistRDDAcrossJobs() throws Exception {
-        final String rddName = "target/test-output/" + UUID.randomUUID();
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-        ////////
-        SparkConf sparkConfiguration = new SparkConf();
-        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
-        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-        assertTrue(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
-        ///////
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
-        graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.NOTHING)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-
-    @Test
-    public void testBulkLoaderVertexProgramChain() throws Exception {
-        final String rddName = "target/test-output/" + UUID.randomUUID().toString();
-        final Configuration readConfiguration = new BaseConfiguration();
-        readConfiguration.setProperty("spark.master", "local[4]");
-        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
-        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
-        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph pageRankGraph = GraphFactory.open(readConfiguration);
-        ///////////////
-        final Configuration writeConfiguration = new BaseConfiguration();
-        writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
-        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
-        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
-        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
-        bulkLoaderGraph.compute(SparkGraphComputer.class)
-                .persist(GraphComputer.Persist.NOTHING)
-                .workers(1)
-                .configure(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName())
-                .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName)
-                .configure(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)
-                .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null)
-                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
-                .submit().get();
-        ////
-        SparkConf sparkConfiguration = new SparkConf();
-        sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
-        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
-        ////
-        final Graph graph = TinkerGraph.open();
-        final GraphTraversalSource g = graph.traversal();
-        graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
-        assertEquals(6l, g.V().count().next().longValue());
-        assertEquals(6l, g.E().count().next().longValue());
-        assertEquals("marko", g.V().has("name", "marko").values("name").next());
-        assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
-        assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
-        assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
-        assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
new file mode 100644
index 0000000..8417d1c
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ExampleInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final List<Vertex> list = new ArrayList<>();
+        list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
+        list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
+        list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
+        list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
+        return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
new file mode 100644
index 0000000..9a9e5ef
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ExampleOutputRDD implements OutputRDD {
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        int totalAge = 0;
+        final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
+        while (iterator.hasNext()) {
+            final Vertex vertex = iterator.next().get();
+            if (vertex.label().equals("person"))
+                totalAge = totalAge + vertex.<Integer>value("age");
+        }
+        assertEquals(123, totalAge);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
new file mode 100644
index 0000000..1feeb61
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class InputOutputRDDTest {
+
+    @Test
+    public void shouldReadFromWriteToArbitraryRDD() throws Exception {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
new file mode 100644
index 0000000..8d6c06e
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class InputRDDTest {
+
+    @Test
+    public void shouldReadFromArbitraryRDD() {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
+        assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
new file mode 100644
index 0000000..43dcdbd
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class OutputRDDTest {
+
+    @Test
+    public void shouldWriteToArbitraryRDD() throws Exception {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
new file mode 100644
index 0000000..6ee7aaa
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PersistedInputOutputRDDTest {
+
+    @Test
+    public void shouldNotPersistRDDAcrossJobs() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);  // because the spark context is NOT persisted, neither is the RDD
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("shouldNotPersistRDDAcrossJobs");
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+    }
+
+    @Test
+    public void shouldPersistRDDAcrossJobs() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertTrue(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ///////
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+        graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.NOTHING)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+
+    @Test
+    public void testBulkLoaderVertexProgramChain() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID().toString();
+        final Configuration readConfiguration = new BaseConfiguration();
+        readConfiguration.setProperty("spark.master", "local[4]");
+        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph pageRankGraph = GraphFactory.open(readConfiguration);
+        ///////////////
+        final Configuration writeConfiguration = new BaseConfiguration();
+        writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
+        bulkLoaderGraph.compute(SparkGraphComputer.class)
+                .persist(GraphComputer.Persist.NOTHING)
+                .workers(1)
+                .configure(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName())
+                .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName)
+                .configure(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)
+                .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null)
+                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
+                .submit().get();
+        ////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ////
+        final Graph graph = TinkerGraph.open();
+        final GraphTraversalSource g = graph.traversal();
+        graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+        assertEquals(6l, g.V().count().next().longValue());
+        assertEquals(6l, g.E().count().next().longValue());
+        assertEquals("marko", g.V().has("name", "marko").values("name").next());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+    }
+
+    @Test
+    public void testBulkLoaderVertexProgramChainWithInputOutputHelperMapping() throws Exception {
+        final String rddName = "target/test-output/" + UUID.randomUUID().toString();
+        final Configuration readConfiguration = new BaseConfiguration();
+        readConfiguration.setProperty("spark.master", "local[4]");
+        readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph pageRankGraph = GraphFactory.open(readConfiguration);
+        ///////////////
+        final Configuration writeConfiguration = new BaseConfiguration();
+        writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+        writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+        final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
+        bulkLoaderGraph.compute(SparkGraphComputer.class)
+                .persist(GraphComputer.Persist.NOTHING)
+                .workers(1)
+                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
+                .submit().get();
+        ////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ////
+        final Graph graph = TinkerGraph.open();
+        final GraphTraversalSource g = graph.traversal();
+        graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+        assertEquals(6l, g.V().count().next().longValue());
+        assertEquals(6l, g.E().count().next().longValue());
+        assertEquals("marko", g.V().has("name", "marko").values("name").next());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+        assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+        assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+    }
+}


[08/12] incubator-tinkerpop git commit: @RussellSpitzer has schooled me in cache()/unpersist(). I now am smart to unpersist() RDDs that are just dangling around (especailly since we now have persistent contexts). Finally, I did some refactoring of packag

Posted by ok...@apache.org.
@RussellSpitzer has schooled me in cache()/unpersist(). I now am smart to unpersist() RDDs that are just dangling around (especailly since we now have persistent contexts). Finally, I did some refactoring of packages (non-breaking) as I'm starting to see the future of what SparkGraph looks like (in other words, being able to use SparkGraph w/o HadoopGraph (back SparkGraph by an RDD). Anywho, that refactor cleaned this up for now and for the future so its good.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/cb972387
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/cb972387
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/cb972387

Branch: refs/heads/master
Commit: cb972387bd369619a39b3ced215997703a8bb9e9
Parents: eeb0d9f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Oct 30 14:30:10 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Oct 30 14:30:10 2015 -0600

----------------------------------------------------------------------
 docs/src/implementations.asciidoc               |   4 +-
 .../process/computer/GiraphGraphComputer.java   |   8 +-
 .../computer/io/GiraphVertexInputFormat.java    |  70 ------
 .../computer/io/GiraphVertexOutputFormat.java   |  65 ------
 .../process/computer/io/GiraphVertexReader.java |  67 ------
 .../process/computer/io/GiraphVertexWriter.java |  57 -----
 .../structure/io/GiraphVertexInputFormat.java   |  70 ++++++
 .../structure/io/GiraphVertexOutputFormat.java  |  65 ++++++
 .../giraph/structure/io/GiraphVertexReader.java |  67 ++++++
 .../giraph/structure/io/GiraphVertexWriter.java |  57 +++++
 .../hadoop/structure/io/InputOutputHelper.java  |  22 ++
 .../hadoop/structure/util/HadoopHelper.java     |  50 -----
 .../process/computer/SparkGraphComputer.java    |  15 +-
 .../process/computer/io/InputFormatRDD.java     |  47 ----
 .../spark/process/computer/io/InputRDD.java     |  41 ----
 .../process/computer/io/OutputFormatRDD.java    |  49 -----
 .../spark/process/computer/io/OutputRDD.java    |  31 ---
 .../process/computer/io/PersistedInputRDD.java  |  60 -----
 .../process/computer/io/PersistedOutputRDD.java |  41 ----
 .../spark/structure/io/InputFormatRDD.java      |  47 ++++
 .../spark/structure/io/InputOutputHelper.java   |  81 +++++++
 .../gremlin/spark/structure/io/InputRDD.java    |  41 ++++
 .../spark/structure/io/OutputFormatRDD.java     |  49 +++++
 .../gremlin/spark/structure/io/OutputRDD.java   |  31 +++
 .../spark/structure/io/PersistedInputRDD.java   |  60 +++++
 .../spark/structure/io/PersistedOutputRDD.java  |  41 ++++
 .../process/computer/io/ExampleInputRDD.java    |  47 ----
 .../process/computer/io/ExampleOutputRDD.java   |  45 ----
 .../process/computer/io/InputOutputRDDTest.java |  59 -----
 .../spark/process/computer/io/InputRDDTest.java |  54 -----
 .../process/computer/io/OutputRDDTest.java      |  62 ------
 .../io/PersistedInputOutputRDDTest.java         | 172 ---------------
 .../spark/structure/io/ExampleInputRDD.java     |  48 ++++
 .../spark/structure/io/ExampleOutputRDD.java    |  46 ++++
 .../spark/structure/io/InputOutputRDDTest.java  |  59 +++++
 .../spark/structure/io/InputRDDTest.java        |  54 +++++
 .../spark/structure/io/OutputRDDTest.java       |  62 ++++++
 .../io/PersistedInputOutputRDDTest.java         | 217 +++++++++++++++++++
 38 files changed, 1131 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/docs/src/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/implementations.asciidoc b/docs/src/implementations.asciidoc
index 8110c5f..7792665 100644
--- a/docs/src/implementations.asciidoc
+++ b/docs/src/implementations.asciidoc
@@ -1001,8 +1001,8 @@ spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerial
 ####################################
 # SparkGraphComputer Configuration #
 ####################################
-gremlin.spark.graphInputRDD=org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDDFormat
-gremlin.spark.graphOutputRDD=org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDDFormat
+gremlin.spark.graphInputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat
+gremlin.spark.graphOutputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDDFormat
 gremlin.spark.persistContext=true
 #####################################
 # GiraphGraphComputer Configuration #

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 4c33220..0389750 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -34,17 +34,17 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.io.GiraphVertexInputFormat;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.io.GiraphVertexOutputFormat;
+import org.apache.tinkerpop.gremlin.giraph.structure.io.GiraphVertexInputFormat;
+import org.apache.tinkerpop.gremlin.giraph.structure.io.GiraphVertexOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
@@ -126,7 +126,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
             }
 
             this.memory.setRuntime(System.currentTimeMillis() - startTime);
-            return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), this.memory.asImmutable());
+            return new DefaultComputerResult(InputOutputHelper.getOutputGraph(ConfUtil.makeApacheConfiguration(this.giraphConfiguration), this.resultGraph, this.persist), this.memory.asImmutable());
         });
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java
deleted file mode 100644
index e5407da..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexInputFormat extends VertexInputFormat {
-
-    private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
-
-    @Override
-    public void checkInputSpecs(final Configuration configuration) {
-
-    }
-
-    @Override
-    public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        return this.hadoopGraphInputFormat.getSplits(context);
-    }
-
-    @Override
-    public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
-        this.constructor(context.getConfiguration());
-        try {
-            return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-    }
-
-    private final void constructor(final Configuration configuration) {
-        if (null == this.hadoopGraphInputFormat) {
-            this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java
deleted file mode 100644
index c1360c7..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexOutputFormat extends VertexOutputFormat {
-
-    private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
-
-    @Override
-    public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
-    }
-
-    @Override
-    public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        this.hadoopGraphOutputFormat.checkOutputSpecs(context);
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        return this.hadoopGraphOutputFormat.getOutputCommitter(context);
-    }
-
-    private final void constructor(final Configuration configuration) {
-        if (null == this.hadoopGraphOutputFormat) {
-            this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
deleted file mode 100644
index c8ed797..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexReader extends VertexReader {
-
-    private RecordReader<NullWritable, VertexWritable> recordReader;
-
-    public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
-        this.recordReader = recordReader;
-    }
-
-    @Override
-    public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.recordReader.initialize(inputSplit, context);
-    }
-
-    @Override
-    public boolean nextVertex() throws IOException, InterruptedException {
-        return this.recordReader.nextKeyValue();
-    }
-
-    @Override
-    public Vertex getCurrentVertex() throws IOException, InterruptedException {
-        return new GiraphVertex(this.recordReader.getCurrentValue());
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.recordReader.close();
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-        return this.recordReader.getProgress();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
deleted file mode 100644
index d67b736..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexWriter extends VertexWriter {
-    private final OutputFormat<NullWritable, VertexWritable> outputFormat;
-    private RecordWriter<NullWritable, VertexWritable> recordWriter;
-
-    public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
-        this.outputFormat = outputFormat;
-    }
-
-    @Override
-    public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.recordWriter = this.outputFormat.getRecordWriter(context);
-    }
-
-    @Override
-    public void close(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.recordWriter.close(context);
-    }
-
-    @Override
-    public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
-        this.recordWriter.write(NullWritable.get(), ((GiraphVertex) vertex).getValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
new file mode 100644
index 0000000..6b6638a
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexInputFormat extends VertexInputFormat {
+
+    private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
+
+    @Override
+    public void checkInputSpecs(final Configuration configuration) {
+
+    }
+
+    @Override
+    public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        return this.hadoopGraphInputFormat.getSplits(context);
+    }
+
+    @Override
+    public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
+        this.constructor(context.getConfiguration());
+        try {
+            return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private final void constructor(final Configuration configuration) {
+        if (null == this.hadoopGraphInputFormat) {
+            this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
new file mode 100644
index 0000000..2d9b4ba
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexOutputFormat extends VertexOutputFormat {
+
+    private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
+
+    @Override
+    public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
+    }
+
+    @Override
+    public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        this.hadoopGraphOutputFormat.checkOutputSpecs(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        return this.hadoopGraphOutputFormat.getOutputCommitter(context);
+    }
+
+    private final void constructor(final Configuration configuration) {
+        if (null == this.hadoopGraphOutputFormat) {
+            this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
new file mode 100644
index 0000000..2a6ade6
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexReader extends VertexReader {
+
+    private RecordReader<NullWritable, VertexWritable> recordReader;
+
+    public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
+        this.recordReader = recordReader;
+    }
+
+    @Override
+    public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.recordReader.initialize(inputSplit, context);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return this.recordReader.nextKeyValue();
+    }
+
+    @Override
+    public Vertex getCurrentVertex() throws IOException, InterruptedException {
+        return new GiraphVertex(this.recordReader.getCurrentValue());
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.recordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return this.recordReader.getProgress();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
new file mode 100644
index 0000000..43c4b0b
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexWriter extends VertexWriter {
+    private final OutputFormat<NullWritable, VertexWritable> outputFormat;
+    private RecordWriter<NullWritable, VertexWritable> recordWriter;
+
+    public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
+        this.outputFormat = outputFormat;
+    }
+
+    @Override
+    public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.recordWriter = this.outputFormat.getRecordWriter(context);
+    }
+
+    @Override
+    public void close(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.recordWriter.close(context);
+    }
+
+    @Override
+    public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
+        this.recordWriter.write(NullWritable.get(), ((GiraphVertex) vertex).getValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
index 8a80053..04097c1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
@@ -18,15 +18,21 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptOutputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -65,4 +71,20 @@ public final class InputOutputHelper {
         INPUT_TO_OUTPUT_CACHE.put(inputFormat, outputFormat);
         OUTPUT_TO_INPUT_CACHE.put(outputFormat, inputFormat);
     }
+
+    public static HadoopGraph getOutputGraph(final Configuration configuration, final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
+        final HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(configuration);
+        final BaseConfiguration newConfiguration = new BaseConfiguration();
+        newConfiguration.copy(hadoopConfiguration);
+        if (resultGraph.equals(GraphComputer.ResultGraph.NEW)) {
+            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "/" + Constants.HIDDEN_G);
+            if (hadoopConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
+                newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat(hadoopConfiguration.getGraphOutputFormat()).getCanonicalName());
+            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");
+            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, persist.equals(GraphComputer.Persist.EDGES));
+        } else {
+            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");
+        }
+        return HadoopGraph.open(newConfiguration);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
deleted file mode 100644
index ebfdf91..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.util;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HadoopHelper {
-
-    private HadoopHelper() {
-    }
-
-    public static HadoopGraph getOutputGraph(final HadoopGraph hadoopGraph, final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
-        final BaseConfiguration newConfiguration = new BaseConfiguration();
-        newConfiguration.copy(hadoopGraph.configuration());
-        if (resultGraph.equals(GraphComputer.ResultGraph.NEW)) {
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "/" + Constants.HIDDEN_G);
-            if (hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
-                newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat(hadoopGraph.configuration().getGraphOutputFormat()).getCanonicalName());
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "_");
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, persist.equals(GraphComputer.Persist.EDGES));
-        } else {
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "_");
-        }
-        return HadoopGraph.open(newConfiguration);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 0656f5a..b25073b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -37,7 +37,6 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
@@ -45,11 +44,12 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
 
 import java.io.File;
 import java.io.IOException;
@@ -209,6 +209,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         // write the map reduce output back to disk (memory)
                         SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                     }
+                    mapReduceGraphRDD.unpersist();
                 }
 
                 // unpersist the graphRDD if it will no longer be used
@@ -217,9 +218,9 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 }
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
-                return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
+                return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
             } finally {
-                if (sparkContext != null && !this.hadoopGraph.configuration().getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+                if (sparkContext != null && !apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
                     sparkContext.stop();
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
deleted file mode 100644
index adb080b..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class InputFormatRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
-        return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
-                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
-                NullWritable.class,
-                VertexWritable.class)
-                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
-                .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
deleted file mode 100644
index 291fcd3..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
- * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface InputRDD {
-
-    /**
-     * Read the graphRDD from the underlying graph system.
-     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer}.
-     * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
-     * @return an adjacency list representation of the underlying graph system.
-     */
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
deleted file mode 100644
index 56a1297..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class OutputFormatRDD implements OutputRDD {
-
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-        if (null != outputLocation) {
-            // map back to a <nullwritable,vertexwritable> stream for output
-            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
-                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
-                            NullWritable.class,
-                            VertexWritable.class,
-                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
deleted file mode 100644
index 2580252..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface OutputRDD {
-
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
deleted file mode 100644
index ad521b3..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.rdd.RDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import scala.Tuple2;
-import scala.collection.Iterator;
-
-import java.util.Optional;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class PersistedInputRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final String inputRDDName = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, null);
-        if (null == inputRDDName)
-            throw new IllegalArgumentException(PersistedInputRDD.class.getSimpleName() + " requires " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " in order to retrieve the named graphRDD from the SparkContext");
-        if (!PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).isPresent())
-            throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + inputRDDName);
-        return JavaPairRDD.fromJavaRDD((JavaRDD) PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).get().toJavaRDD());
-    }
-
-    public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
-        final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
-                getPersistentRDDs().
-                toList().iterator();
-        while (iterator.hasNext()) {
-            final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
-            if (tuple2._2().toString().contains(rddName))
-                return Optional.of(tuple2._2());
-        }
-        return Optional.empty();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
deleted file mode 100644
index 1832ca3..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class PersistedOutputRDD implements OutputRDD {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
-
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
-            LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
new file mode 100644
index 0000000..f768939
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputFormatRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+        return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+                NullWritable.class,
+                VertexWritable.class)
+                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+                .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
new file mode 100644
index 0000000..b1e1b0a
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputOutputHelper {
+
+    private static Map<Class<? extends InputRDD>, Class<? extends OutputRDD>> INPUT_TO_OUTPUT_CACHE = new ConcurrentHashMap<>();
+    private static Map<Class<? extends OutputRDD>, Class<? extends InputRDD>> OUTPUT_TO_INPUT_CACHE = new ConcurrentHashMap<>();
+
+    static {
+        INPUT_TO_OUTPUT_CACHE.put(PersistedInputRDD.class, PersistedOutputRDD.class);
+        INPUT_TO_OUTPUT_CACHE.put(InputFormatRDD.class, OutputFormatRDD.class);
+        //
+        OUTPUT_TO_INPUT_CACHE.put(PersistedOutputRDD.class, PersistedInputRDD.class);
+        OUTPUT_TO_INPUT_CACHE.put(OutputFormatRDD.class, InputFormatRDD.class);
+    }
+
+    private InputOutputHelper() {
+
+    }
+
+    public static Class<? extends InputRDD> getInputFormat(final Class<? extends OutputRDD> outputRDD) {
+        return OUTPUT_TO_INPUT_CACHE.get(outputRDD);
+    }
+
+    public static Class<? extends OutputRDD> getOutputFormat(final Class<? extends InputRDD> inputRDD) {
+        return INPUT_TO_OUTPUT_CACHE.get(inputRDD);
+    }
+
+    public static void registerInputOutputPair(final Class<? extends InputRDD> inputRDD, final Class<? extends OutputRDD> outputRDD) {
+        INPUT_TO_OUTPUT_CACHE.put(inputRDD, outputRDD);
+        OUTPUT_TO_INPUT_CACHE.put(outputRDD, inputRDD);
+    }
+
+    public static HadoopGraph getOutputGraph(final Configuration configuration, final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
+        try {
+            final HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(configuration);
+            final BaseConfiguration newConfiguration = new BaseConfiguration();
+            newConfiguration.copy(org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper.getOutputGraph(configuration, resultGraph, persist).configuration());
+            if (resultGraph.equals(GraphComputer.ResultGraph.NEW) && hadoopConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD)) {
+                newConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputOutputHelper.getInputFormat((Class) Class.forName(hadoopConfiguration.getProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD).toString())).getCanonicalName());
+                if (newConfiguration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, "").endsWith("/" + Constants.HIDDEN_G)) {  // Spark RDDs are not namespaced the same as Hadoop
+                    newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, newConfiguration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).replace("/" + Constants.HIDDEN_G, ""));
+                }
+            }
+            return HadoopGraph.open(newConfiguration);
+        } catch (final ClassNotFoundException e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java
new file mode 100644
index 0000000..c84c189
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
+ * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface InputRDD {
+
+    /**
+     * Read the graphRDD from the underlying graph system.
+     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer}.
+     * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
+     * @return an adjacency list representation of the underlying graph system.
+     */
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
new file mode 100644
index 0000000..cc6ed61
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class OutputFormatRDD implements OutputRDD {
+
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a <nullwritable,vertexwritable> stream for output
+            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
+                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
+                            NullWritable.class,
+                            VertexWritable.class,
+                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java
new file mode 100644
index 0000000..c2964eb
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface OutputRDD {
+
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
new file mode 100644
index 0000000..e3c27ec
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import scala.Tuple2;
+import scala.collection.Iterator;
+
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final String inputRDDName = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, null);
+        if (null == inputRDDName)
+            throw new IllegalArgumentException(PersistedInputRDD.class.getSimpleName() + " requires " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " in order to retrieve the named graphRDD from the SparkContext");
+        if (!PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).isPresent())
+            throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + inputRDDName);
+        return JavaPairRDD.fromJavaRDD((JavaRDD) PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).get().toJavaRDD());
+    }
+
+    public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
+        final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
+                getPersistentRDDs().
+                toList().iterator();
+        while (iterator.hasNext()) {
+            final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
+            if (tuple2._2().toString().contains(rddName))
+                return Optional.of(tuple2._2());
+        }
+        return Optional.empty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
new file mode 100644
index 0000000..abeeaaa
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedOutputRDD implements OutputRDD {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
+
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+            LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
deleted file mode 100644
index 1fb85a1..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.T;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleInputRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final List<Vertex> list = new ArrayList<>();
-        list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
-        list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
-        list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
-        list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
-        return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
deleted file mode 100644
index bb38f7f..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleOutputRDD implements OutputRDD {
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        int totalAge = 0;
-        final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
-        while (iterator.hasNext()) {
-            final Vertex vertex = iterator.next().get();
-            if (vertex.label().equals("person"))
-                totalAge = totalAge + vertex.<Integer>value("age");
-        }
-        assertEquals(123, totalAge);
-    }
-}



[02/12] incubator-tinkerpop git commit: Added a test case that verifies a PageRankVertexProgram to BulkLoaderVertexProgram load into Spark without touching HDFS. Need to do the GraphComputer.config() ticket to make this all pretty.

Posted by ok...@apache.org.
Added a test case that verifies a PageRankVertexProgram to BulkLoaderVertexProgram load into Spark without touching HDFS. Need to do the GraphComputer.config() ticket to make this all pretty.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3d2d6a69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3d2d6a69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3d2d6a69

Branch: refs/heads/master
Commit: 3d2d6a69086166ebd34ee10ade656c0e61a1ac0c
Parents: 82bbc59
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Oct 27 16:02:42 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Oct 27 16:02:42 2015 -0600

----------------------------------------------------------------------
 .../process/computer/SparkGraphComputer.java    | 18 +++----
 .../io/SparkContextPersistenceTest.java         | 49 ++++++++++++++------
 2 files changed, 44 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3d2d6a69/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 3a5a8a7..7134f26 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -83,15 +83,17 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
         final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
         apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
         final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
-        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
-            try {
-                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-                apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
-                hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
+        if (null == hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null)) {
+            if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+                try {
+                    final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+                    apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+                    hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+                } catch (final IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
             }
-        }
+        } // else WARN that both an INPUT_FORMAT and INPUT_RDD_NAME were provided?
 
         // create the completable future
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3d2d6a69/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
index 87bbf2b..e19d89f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
@@ -30,6 +30,8 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
@@ -37,10 +39,13 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -56,7 +61,6 @@ public class SparkContextPersistenceTest {
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
         configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
@@ -89,19 +93,34 @@ public class SparkContextPersistenceTest {
 
     @Test
     public void testBulkLoaderVertexProgramChain() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
-        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph graph = GraphFactory.open(configuration);
+        final Configuration readConfiguration = new BaseConfiguration();
+        readConfiguration.setProperty("spark.master", "local[4]");
+        readConfiguration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
+        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph graph = GraphFactory.open(readConfiguration);
 
-        //graph.compute().program(PageRankVertexProgram.build().create(graph)).submit().get().graph().compute().workers(1).program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(conf).create(graph)).submit().get()
+        ///////////////
+        final Configuration writeConfiguration = new BaseConfiguration();
+        writeConfiguration.setProperty(Graph.GRAPH, "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph");
+        writeConfiguration.setProperty(TinkerGraph.CONFIG_GRAPH_FORMAT, "gryo");
+        writeConfiguration.setProperty(TinkerGraph.CONFIG_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+        final Graph secondGraph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().create(graph)).submit().get().graph();
+        secondGraph.configuration().setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing");
+        secondGraph.configuration().setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null);
+        secondGraph.compute(SparkGraphComputer.class)
+                .persist(GraphComputer.Persist.NOTHING)
+                .workers(1)
+                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(secondGraph))
+                .submit().get();
+        final Graph finalGraph = TinkerGraph.open();
+        finalGraph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+        assertEquals(6l,finalGraph.traversal().V().count().next().longValue());
     }
 }


[12/12] incubator-tinkerpop git commit: tweaks to upgrade docs.

Posted by ok...@apache.org.
tweaks to upgrade docs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/aadd4221
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/aadd4221
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/aadd4221

Branch: refs/heads/master
Commit: aadd4221200be353da0cd8369c135ab9396c55cc
Parents: 0dc2dbc
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Nov 2 08:51:49 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Nov 2 08:51:49 2015 -0700

----------------------------------------------------------------------
 .../src/upgrade-release-3.1.x-incubating.asciidoc | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/aadd4221/docs/src/upgrade-release-3.1.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade-release-3.1.x-incubating.asciidoc b/docs/src/upgrade-release-3.1.x-incubating.asciidoc
index 77a448f..9e5b50d 100644
--- a/docs/src/upgrade-release-3.1.x-incubating.asciidoc
+++ b/docs/src/upgrade-release-3.1.x-incubating.asciidoc
@@ -40,7 +40,7 @@ Important Changes
 * The data type of `Operator` enums will now always be the highest common data type of the two given numbers, rather than the data type of the first number, as it's been before.
 * The Gephi Plugin has improved integration with Gephi, where manually inserting `store` steps to visualize a running traversal is no longer required.
 * Entire TinkerGraph instances can be serialized over Gryo.
-* Hadoop1 support has been dropped. Hadoop2 is now supported. Giraph 1.1.0 is now supported and Spark of Hadoop2 YARN.
+* Hadoop1 support has been dropped. Hadoop2 is now supported. Giraph and Spark can work over Hadoop2 YARN.
 * The implementation and semantics of `GraphTraversal.group()` has changed. The previous model is deprecated and renamed to `groupV3d0()`.
 * `PartitionStrategy` now supports `VertexProperty` such those properties can be added to partitions.
 * The Jackson library is now "shaded" and included in the `gremlin-shaded` module.
@@ -69,14 +69,22 @@ Hadoop-Gremlin Updates
 ++++++++++++++++++++++
 
 * Hadoop1 is no longer supported. Hadoop2 is now the only supported Hadoop version in TinkerPop.
+* Spark and Giraph have been split out of Hadoop-Gremlin into their own respective packages (Spark-Gremlin and Giraph-Gremlin).
 * The directory where application jars are stored in HDFS is now `hadoop-gremlin-x.y.z-libs`.
 ** This versioning is important so that cross-version TinkerPop use does not cause jar conflicts.
 
+Spark-Gremlin Updates
++++++++++++++++++++++
+
+* Providers that wish to reuse a graphRDD can leverage the new `PersistedInputRDD` and `PersistedOutputRDD`.
+** This allows the graphRDD to avoid serialization into HDFS for reuse. Be sure to enabled persisted `SparkContext` (see documentation).
+
 TinkerGraph-Gremlin Updates
 +++++++++++++++++++++++++++
 
 * The `public static String` configurations have been renamed. The old `public static` variables have been deprecated.
 
+
 Gremlin Process
 ^^^^^^^^^^^^^^^
 
@@ -168,7 +176,7 @@ See link:https://issues.apache.org/jira/browse/TINKERPOP3-876[TINKERPOP3-876] fo
 GraphComputer Updates
 +++++++++++++++++++++
 
-`GraphComputer.configure(String key, Object value)` is now a method. This allows the user to specify engine-specific
-parameters to the underlying OLAP system. These parameters are not intended to be cross engine supported. Moreover, if
-there are not parameters that can be altered (beyond the standard `GraphComputer` methods), then the provider's `GraphComputer`
-implementation should simply return and do nothing.
\ No newline at end of file
+`GraphComputer.configure(String key, Object value)` is now a method (with default implementation).
+This allows the user to specify engine-specific parameters to the underlying OLAP system. These parameters are not intended
+to be cross engine supported. Moreover, if there are not parameters that can be altered (beyond the standard `GraphComputer`
+methods), then the provider's `GraphComputer` implementation should simply return and do nothing.
\ No newline at end of file