You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/11/03 17:48:55 UTC

[08/14] incubator-tinkerpop git commit: @RussellSpitzer has schooled me in cache()/unpersist(). I now am smart to unpersist() RDDs that are just dangling around (especailly since we now have persistent contexts). Finally, I did some refactoring of packag

@RussellSpitzer has schooled me in cache()/unpersist(). I now am smart to unpersist() RDDs that are just dangling around (especailly since we now have persistent contexts). Finally, I did some refactoring of packages (non-breaking) as I'm starting to see the future of what SparkGraph looks like (in other words, being able to use SparkGraph w/o HadoopGraph (back SparkGraph by an RDD). Anywho, that refactor cleaned this up for now and for the future so its good.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/cb972387
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/cb972387
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/cb972387

Branch: refs/heads/TINKERPOP3-909
Commit: cb972387bd369619a39b3ced215997703a8bb9e9
Parents: eeb0d9f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Oct 30 14:30:10 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Oct 30 14:30:10 2015 -0600

----------------------------------------------------------------------
 docs/src/implementations.asciidoc               |   4 +-
 .../process/computer/GiraphGraphComputer.java   |   8 +-
 .../computer/io/GiraphVertexInputFormat.java    |  70 ------
 .../computer/io/GiraphVertexOutputFormat.java   |  65 ------
 .../process/computer/io/GiraphVertexReader.java |  67 ------
 .../process/computer/io/GiraphVertexWriter.java |  57 -----
 .../structure/io/GiraphVertexInputFormat.java   |  70 ++++++
 .../structure/io/GiraphVertexOutputFormat.java  |  65 ++++++
 .../giraph/structure/io/GiraphVertexReader.java |  67 ++++++
 .../giraph/structure/io/GiraphVertexWriter.java |  57 +++++
 .../hadoop/structure/io/InputOutputHelper.java  |  22 ++
 .../hadoop/structure/util/HadoopHelper.java     |  50 -----
 .../process/computer/SparkGraphComputer.java    |  15 +-
 .../process/computer/io/InputFormatRDD.java     |  47 ----
 .../spark/process/computer/io/InputRDD.java     |  41 ----
 .../process/computer/io/OutputFormatRDD.java    |  49 -----
 .../spark/process/computer/io/OutputRDD.java    |  31 ---
 .../process/computer/io/PersistedInputRDD.java  |  60 -----
 .../process/computer/io/PersistedOutputRDD.java |  41 ----
 .../spark/structure/io/InputFormatRDD.java      |  47 ++++
 .../spark/structure/io/InputOutputHelper.java   |  81 +++++++
 .../gremlin/spark/structure/io/InputRDD.java    |  41 ++++
 .../spark/structure/io/OutputFormatRDD.java     |  49 +++++
 .../gremlin/spark/structure/io/OutputRDD.java   |  31 +++
 .../spark/structure/io/PersistedInputRDD.java   |  60 +++++
 .../spark/structure/io/PersistedOutputRDD.java  |  41 ++++
 .../process/computer/io/ExampleInputRDD.java    |  47 ----
 .../process/computer/io/ExampleOutputRDD.java   |  45 ----
 .../process/computer/io/InputOutputRDDTest.java |  59 -----
 .../spark/process/computer/io/InputRDDTest.java |  54 -----
 .../process/computer/io/OutputRDDTest.java      |  62 ------
 .../io/PersistedInputOutputRDDTest.java         | 172 ---------------
 .../spark/structure/io/ExampleInputRDD.java     |  48 ++++
 .../spark/structure/io/ExampleOutputRDD.java    |  46 ++++
 .../spark/structure/io/InputOutputRDDTest.java  |  59 +++++
 .../spark/structure/io/InputRDDTest.java        |  54 +++++
 .../spark/structure/io/OutputRDDTest.java       |  62 ++++++
 .../io/PersistedInputOutputRDDTest.java         | 217 +++++++++++++++++++
 38 files changed, 1131 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/docs/src/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/implementations.asciidoc b/docs/src/implementations.asciidoc
index 8110c5f..7792665 100644
--- a/docs/src/implementations.asciidoc
+++ b/docs/src/implementations.asciidoc
@@ -1001,8 +1001,8 @@ spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerial
 ####################################
 # SparkGraphComputer Configuration #
 ####################################
-gremlin.spark.graphInputRDD=org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDDFormat
-gremlin.spark.graphOutputRDD=org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDDFormat
+gremlin.spark.graphInputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat
+gremlin.spark.graphOutputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDDFormat
 gremlin.spark.persistContext=true
 #####################################
 # GiraphGraphComputer Configuration #

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 4c33220..0389750 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -34,17 +34,17 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.io.GiraphVertexInputFormat;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.io.GiraphVertexOutputFormat;
+import org.apache.tinkerpop.gremlin.giraph.structure.io.GiraphVertexInputFormat;
+import org.apache.tinkerpop.gremlin.giraph.structure.io.GiraphVertexOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
@@ -126,7 +126,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
             }
 
             this.memory.setRuntime(System.currentTimeMillis() - startTime);
-            return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), this.memory.asImmutable());
+            return new DefaultComputerResult(InputOutputHelper.getOutputGraph(ConfUtil.makeApacheConfiguration(this.giraphConfiguration), this.resultGraph, this.persist), this.memory.asImmutable());
         });
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java
deleted file mode 100644
index e5407da..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexInputFormat.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexInputFormat extends VertexInputFormat {
-
-    private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
-
-    @Override
-    public void checkInputSpecs(final Configuration configuration) {
-
-    }
-
-    @Override
-    public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        return this.hadoopGraphInputFormat.getSplits(context);
-    }
-
-    @Override
-    public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
-        this.constructor(context.getConfiguration());
-        try {
-            return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-    }
-
-    private final void constructor(final Configuration configuration) {
-        if (null == this.hadoopGraphInputFormat) {
-            this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java
deleted file mode 100644
index c1360c7..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexOutputFormat.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexOutputFormat extends VertexOutputFormat {
-
-    private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
-
-    @Override
-    public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
-    }
-
-    @Override
-    public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        this.hadoopGraphOutputFormat.checkOutputSpecs(context);
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.constructor(context.getConfiguration());
-        return this.hadoopGraphOutputFormat.getOutputCommitter(context);
-    }
-
-    private final void constructor(final Configuration configuration) {
-        if (null == this.hadoopGraphOutputFormat) {
-            this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
deleted file mode 100644
index c8ed797..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexReader.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexReader extends VertexReader {
-
-    private RecordReader<NullWritable, VertexWritable> recordReader;
-
-    public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
-        this.recordReader = recordReader;
-    }
-
-    @Override
-    public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.recordReader.initialize(inputSplit, context);
-    }
-
-    @Override
-    public boolean nextVertex() throws IOException, InterruptedException {
-        return this.recordReader.nextKeyValue();
-    }
-
-    @Override
-    public Vertex getCurrentVertex() throws IOException, InterruptedException {
-        return new GiraphVertex(this.recordReader.getCurrentValue());
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.recordReader.close();
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-        return this.recordReader.getProgress();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
deleted file mode 100644
index d67b736..0000000
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/io/GiraphVertexWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexWriter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GiraphVertexWriter extends VertexWriter {
-    private final OutputFormat<NullWritable, VertexWritable> outputFormat;
-    private RecordWriter<NullWritable, VertexWritable> recordWriter;
-
-    public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
-        this.outputFormat = outputFormat;
-    }
-
-    @Override
-    public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.recordWriter = this.outputFormat.getRecordWriter(context);
-    }
-
-    @Override
-    public void close(final TaskAttemptContext context) throws IOException, InterruptedException {
-        this.recordWriter.close(context);
-    }
-
-    @Override
-    public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
-        this.recordWriter.write(NullWritable.get(), ((GiraphVertex) vertex).getValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
new file mode 100644
index 0000000..6b6638a
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexInputFormat extends VertexInputFormat {
+
+    private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
+
+    @Override
+    public void checkInputSpecs(final Configuration configuration) {
+
+    }
+
+    @Override
+    public List<InputSplit> getSplits(final JobContext context, final int minSplitCountHint) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        return this.hadoopGraphInputFormat.getSplits(context);
+    }
+
+    @Override
+    public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
+        this.constructor(context.getConfiguration());
+        try {
+            return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context));
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private final void constructor(final Configuration configuration) {
+        if (null == this.hadoopGraphInputFormat) {
+            this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
new file mode 100644
index 0000000..2d9b4ba
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexOutputFormat.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexOutputFormat extends VertexOutputFormat {
+
+    private OutputFormat<NullWritable, VertexWritable> hadoopGraphOutputFormat;
+
+    @Override
+    public VertexWriter createVertexWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        return new GiraphVertexWriter(this.hadoopGraphOutputFormat);
+    }
+
+    @Override
+    public void checkOutputSpecs(final JobContext context) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        this.hadoopGraphOutputFormat.checkOutputSpecs(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.constructor(context.getConfiguration());
+        return this.hadoopGraphOutputFormat.getOutputCommitter(context);
+    }
+
+    private final void constructor(final Configuration configuration) {
+        if (null == this.hadoopGraphOutputFormat) {
+            this.hadoopGraphOutputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class, OutputFormat.class), configuration);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
new file mode 100644
index 0000000..2a6ade6
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexReader extends VertexReader {
+
+    private RecordReader<NullWritable, VertexWritable> recordReader;
+
+    public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader) {
+        this.recordReader = recordReader;
+    }
+
+    @Override
+    public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.recordReader.initialize(inputSplit, context);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return this.recordReader.nextKeyValue();
+    }
+
+    @Override
+    public Vertex getCurrentVertex() throws IOException, InterruptedException {
+        return new GiraphVertex(this.recordReader.getCurrentValue());
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.recordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return this.recordReader.getProgress();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
new file mode 100644
index 0000000..43c4b0b
--- /dev/null
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.structure.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphVertexWriter extends VertexWriter {
+    private final OutputFormat<NullWritable, VertexWritable> outputFormat;
+    private RecordWriter<NullWritable, VertexWritable> recordWriter;
+
+    public GiraphVertexWriter(final OutputFormat<NullWritable, VertexWritable> outputFormat) {
+        this.outputFormat = outputFormat;
+    }
+
+    @Override
+    public void initialize(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.recordWriter = this.outputFormat.getRecordWriter(context);
+    }
+
+    @Override
+    public void close(final TaskAttemptContext context) throws IOException, InterruptedException {
+        this.recordWriter.close(context);
+    }
+
+    @Override
+    public void writeVertex(final Vertex vertex) throws IOException, InterruptedException {
+        this.recordWriter.write(NullWritable.get(), ((GiraphVertex) vertex).getValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
index 8a80053..04097c1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
@@ -18,15 +18,21 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptOutputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -65,4 +71,20 @@ public final class InputOutputHelper {
         INPUT_TO_OUTPUT_CACHE.put(inputFormat, outputFormat);
         OUTPUT_TO_INPUT_CACHE.put(outputFormat, inputFormat);
     }
+
+    public static HadoopGraph getOutputGraph(final Configuration configuration, final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
+        final HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(configuration);
+        final BaseConfiguration newConfiguration = new BaseConfiguration();
+        newConfiguration.copy(hadoopConfiguration);
+        if (resultGraph.equals(GraphComputer.ResultGraph.NEW)) {
+            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "/" + Constants.HIDDEN_G);
+            if (hadoopConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
+                newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat(hadoopConfiguration.getGraphOutputFormat()).getCanonicalName());
+            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");
+            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, persist.equals(GraphComputer.Persist.EDGES));
+        } else {
+            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");
+        }
+        return HadoopGraph.open(newConfiguration);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
deleted file mode 100644
index ebfdf91..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.util;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HadoopHelper {
-
-    private HadoopHelper() {
-    }
-
-    public static HadoopGraph getOutputGraph(final HadoopGraph hadoopGraph, final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
-        final BaseConfiguration newConfiguration = new BaseConfiguration();
-        newConfiguration.copy(hadoopGraph.configuration());
-        if (resultGraph.equals(GraphComputer.ResultGraph.NEW)) {
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "/" + Constants.HIDDEN_G);
-            if (hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
-                newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat(hadoopGraph.configuration().getGraphOutputFormat()).getCanonicalName());
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "_");
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, persist.equals(GraphComputer.Persist.EDGES));
-        } else {
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "_");
-        }
-        return HadoopGraph.open(newConfiguration);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 0656f5a..b25073b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -37,7 +37,6 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
@@ -45,11 +44,12 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
-import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
 
 import java.io.File;
 import java.io.IOException;
@@ -209,6 +209,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         // write the map reduce output back to disk (memory)
                         SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                     }
+                    mapReduceGraphRDD.unpersist();
                 }
 
                 // unpersist the graphRDD if it will no longer be used
@@ -217,9 +218,9 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 }
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
-                return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
+                return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
             } finally {
-                if (sparkContext != null && !this.hadoopGraph.configuration().getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+                if (sparkContext != null && !apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
                     sparkContext.stop();
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
deleted file mode 100644
index adb080b..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class InputFormatRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
-        return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
-                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
-                NullWritable.class,
-                VertexWritable.class)
-                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
-                .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
deleted file mode 100644
index 291fcd3..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
- * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface InputRDD {
-
-    /**
-     * Read the graphRDD from the underlying graph system.
-     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer}.
-     * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
-     * @return an adjacency list representation of the underlying graph system.
-     */
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
deleted file mode 100644
index 56a1297..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class OutputFormatRDD implements OutputRDD {
-
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-        if (null != outputLocation) {
-            // map back to a <nullwritable,vertexwritable> stream for output
-            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
-                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
-                            NullWritable.class,
-                            VertexWritable.class,
-                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
deleted file mode 100644
index 2580252..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface OutputRDD {
-
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
deleted file mode 100644
index ad521b3..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputRDD.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.rdd.RDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import scala.Tuple2;
-import scala.collection.Iterator;
-
-import java.util.Optional;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class PersistedInputRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final String inputRDDName = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, null);
-        if (null == inputRDDName)
-            throw new IllegalArgumentException(PersistedInputRDD.class.getSimpleName() + " requires " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " in order to retrieve the named graphRDD from the SparkContext");
-        if (!PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).isPresent())
-            throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + inputRDDName);
-        return JavaPairRDD.fromJavaRDD((JavaRDD) PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).get().toJavaRDD());
-    }
-
-    public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
-        final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
-                getPersistentRDDs().
-                toList().iterator();
-        while (iterator.hasNext()) {
-            final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
-            if (tuple2._2().toString().contains(rddName))
-                return Optional.of(tuple2._2());
-        }
-        return Optional.empty();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
deleted file mode 100644
index 1832ca3..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedOutputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class PersistedOutputRDD implements OutputRDD {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
-
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
-            LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
new file mode 100644
index 0000000..f768939
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputFormatRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+        return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+                NullWritable.class,
+                VertexWritable.class)
+                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+                .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
new file mode 100644
index 0000000..b1e1b0a
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputOutputHelper {
+
+    private static Map<Class<? extends InputRDD>, Class<? extends OutputRDD>> INPUT_TO_OUTPUT_CACHE = new ConcurrentHashMap<>();
+    private static Map<Class<? extends OutputRDD>, Class<? extends InputRDD>> OUTPUT_TO_INPUT_CACHE = new ConcurrentHashMap<>();
+
+    static {
+        INPUT_TO_OUTPUT_CACHE.put(PersistedInputRDD.class, PersistedOutputRDD.class);
+        INPUT_TO_OUTPUT_CACHE.put(InputFormatRDD.class, OutputFormatRDD.class);
+        //
+        OUTPUT_TO_INPUT_CACHE.put(PersistedOutputRDD.class, PersistedInputRDD.class);
+        OUTPUT_TO_INPUT_CACHE.put(OutputFormatRDD.class, InputFormatRDD.class);
+    }
+
+    private InputOutputHelper() {
+
+    }
+
+    public static Class<? extends InputRDD> getInputFormat(final Class<? extends OutputRDD> outputRDD) {
+        return OUTPUT_TO_INPUT_CACHE.get(outputRDD);
+    }
+
+    public static Class<? extends OutputRDD> getOutputFormat(final Class<? extends InputRDD> inputRDD) {
+        return INPUT_TO_OUTPUT_CACHE.get(inputRDD);
+    }
+
+    public static void registerInputOutputPair(final Class<? extends InputRDD> inputRDD, final Class<? extends OutputRDD> outputRDD) {
+        INPUT_TO_OUTPUT_CACHE.put(inputRDD, outputRDD);
+        OUTPUT_TO_INPUT_CACHE.put(outputRDD, inputRDD);
+    }
+
+    public static HadoopGraph getOutputGraph(final Configuration configuration, final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
+        try {
+            final HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(configuration);
+            final BaseConfiguration newConfiguration = new BaseConfiguration();
+            newConfiguration.copy(org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper.getOutputGraph(configuration, resultGraph, persist).configuration());
+            if (resultGraph.equals(GraphComputer.ResultGraph.NEW) && hadoopConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD)) {
+                newConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputOutputHelper.getInputFormat((Class) Class.forName(hadoopConfiguration.getProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD).toString())).getCanonicalName());
+                if (newConfiguration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, "").endsWith("/" + Constants.HIDDEN_G)) {  // Spark RDDs are not namespaced the same as Hadoop
+                    newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, newConfiguration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).replace("/" + Constants.HIDDEN_G, ""));
+                }
+            }
+            return HadoopGraph.open(newConfiguration);
+        } catch (final ClassNotFoundException e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java
new file mode 100644
index 0000000..c84c189
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
+ * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface InputRDD {
+
+    /**
+     * Read the graphRDD from the underlying graph system.
+     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer}.
+     * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
+     * @return an adjacency list representation of the underlying graph system.
+     */
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
new file mode 100644
index 0000000..cc6ed61
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class OutputFormatRDD implements OutputRDD {
+
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a <nullwritable,vertexwritable> stream for output
+            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
+                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
+                            NullWritable.class,
+                            VertexWritable.class,
+                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java
new file mode 100644
index 0000000..c2964eb
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface OutputRDD {
+
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
new file mode 100644
index 0000000..e3c27ec
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import scala.Tuple2;
+import scala.collection.Iterator;
+
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final String inputRDDName = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION, null);
+        if (null == inputRDDName)
+            throw new IllegalArgumentException(PersistedInputRDD.class.getSimpleName() + " requires " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " in order to retrieve the named graphRDD from the SparkContext");
+        if (!PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).isPresent())
+            throw new IllegalArgumentException("The provided graphRDD name is not in the persisted RDDs of the SparkContext: " + inputRDDName);
+        return JavaPairRDD.fromJavaRDD((JavaRDD) PersistedInputRDD.getPersistedRDD(sparkContext, inputRDDName).get().toJavaRDD());
+    }
+
+    public static Optional<RDD<?>> getPersistedRDD(final JavaSparkContext sparkContext, final String rddName) {
+        final Iterator<Tuple2<Object, RDD<?>>> iterator = JavaSparkContext.toSparkContext(sparkContext).
+                getPersistentRDDs().
+                toList().iterator();
+        while (iterator.hasNext()) {
+            final Tuple2<Object, RDD<?>> tuple2 = iterator.next();
+            if (tuple2._2().toString().contains(rddName))
+                return Optional.of(tuple2._2());
+        }
+        return Optional.empty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
new file mode 100644
index 0000000..abeeaaa
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class PersistedOutputRDD implements OutputRDD {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
+
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        if (!configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+            LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
deleted file mode 100644
index 1fb85a1..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.T;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleInputRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final List<Vertex> list = new ArrayList<>();
-        list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
-        list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
-        list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
-        list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
-        return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
deleted file mode 100644
index bb38f7f..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleOutputRDD implements OutputRDD {
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        int totalAge = 0;
-        final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
-        while (iterator.hasNext()) {
-            final Vertex vertex = iterator.next().get();
-            if (vertex.label().equals("person"))
-                totalAge = totalAge + vertex.<Integer>value("age");
-        }
-        assertEquals(123, totalAge);
-    }
-}