You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/03 20:50:32 UTC

[3/3] incubator-tinkerpop git commit: Spark is now isolated into spark-gremlin package. Tests are passing. A few hacks here and there just to get things building. However, it was pretty easy to split apart -- which is pomising.

Spark is now isolated into spark-gremlin package. Tests are passing. A few hacks here and there just to get things building. However, it was pretty easy to split apart -- which is pomising.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/04f5651e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/04f5651e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/04f5651e

Branch: refs/heads/hadoop_split
Commit: 04f5651e8d8e9108a4281d959b71ff161a52c208
Parents: 0977a25
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Sep 3 12:50:24 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Sep 3 12:50:24 2015 -0600

----------------------------------------------------------------------
 hadoop-gremlin/pom.xml                          |  62 -----
 .../groovy/plugin/HadoopGremlinPlugin.java      |   3 -
 .../process/computer/spark/RuleAccumulator.java |  55 ----
 .../process/computer/spark/SparkExecutor.java   | 222 -----------------
 .../computer/spark/SparkGraphComputer.java      | 216 ----------------
 .../process/computer/spark/SparkMapEmitter.java |  45 ----
 .../process/computer/spark/SparkMemory.java     | 181 --------------
 .../process/computer/spark/SparkMessenger.java  |  83 -------
 .../computer/spark/SparkReduceEmitter.java      |  45 ----
 .../computer/spark/io/InputFormatRDD.java       |  46 ----
 .../process/computer/spark/io/InputRDD.java     |  41 ---
 .../computer/spark/io/OutputFormatRDD.java      |  48 ----
 .../process/computer/spark/io/OutputRDD.java    |  31 ---
 .../computer/spark/payload/MessagePayload.java  |  35 ---
 .../process/computer/spark/payload/Payload.java |  27 --
 .../spark/payload/ViewIncomingPayload.java      |  95 -------
 .../spark/payload/ViewOutgoingPayload.java      |  46 ----
 .../computer/spark/payload/ViewPayload.java     |  39 ---
 .../gremlin/hadoop/structure/HadoopGraph.java   |  16 +-
 .../spark/HadoopSparkGraphProvider.java         |  37 ---
 .../SparkGraphComputerProcessIntegrateTest.java |  32 ---
 ...GraphComputerGroovyProcessIntegrateTest.java |  33 ---
 .../groovy/SparkHadoopGremlinPluginTest.java    |  33 ---
 .../computer/spark/io/ExampleInputRDD.java      |  47 ----
 .../computer/spark/io/ExampleOutputRDD.java     |  45 ----
 .../computer/spark/io/InputOutputRDDTest.java   |  59 -----
 .../process/computer/spark/io/InputRDDTest.java |  54 ----
 .../computer/spark/io/OutputRDDTest.java        |  62 -----
 pom.xml                                         |   1 +
 spark-gremlin/pom.xml                           | 249 +++++++++++++++++++
 .../spark/groovy/plugin/SparkGremlinPlugin.java |  26 ++
 .../spark/process/computer/RuleAccumulator.java |  55 ++++
 .../spark/process/computer/SparkExecutor.java   | 202 +++++++++++++++
 .../process/computer/SparkGraphComputer.java    | 217 ++++++++++++++++
 .../spark/process/computer/SparkMapEmitter.java |  45 ++++
 .../spark/process/computer/SparkMemory.java     | 181 ++++++++++++++
 .../spark/process/computer/SparkMessenger.java  |  83 +++++++
 .../process/computer/SparkReduceEmitter.java    |  45 ++++
 .../process/computer/io/InputFormatRDD.java     |  47 ++++
 .../spark/process/computer/io/InputRDD.java     |  41 +++
 .../process/computer/io/OutputFormatRDD.java    |  49 ++++
 .../spark/process/computer/io/OutputRDD.java    |  31 +++
 .../computer/payload/MessagePayload.java        |  35 +++
 .../spark/process/computer/payload/Payload.java |  27 ++
 .../computer/payload/ViewIncomingPayload.java   |  95 +++++++
 .../computer/payload/ViewOutgoingPayload.java   |  46 ++++
 .../process/computer/payload/ViewPayload.java   |  39 +++
 .../spark/process/HadoopGraphProvider.java      | 166 +++++++++++++
 .../computer/HadoopSparkGraphProvider.java      |  36 +++
 .../SparkGraphComputerProcessIntegrateTest.java |  32 +++
 ...GraphComputerGroovyProcessIntegrateTest.java |  33 +++
 .../groovy/SparkHadoopGremlinPluginTest.java    |  32 +++
 .../process/computer/io/ExampleInputRDD.java    |  47 ++++
 .../process/computer/io/ExampleOutputRDD.java   |  45 ++++
 .../process/computer/io/InputOutputRDDTest.java |  59 +++++
 .../spark/process/computer/io/InputRDDTest.java |  54 ++++
 .../process/computer/io/OutputRDDTest.java      |  62 +++++
 57 files changed, 2091 insertions(+), 1727 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index cb6a050..2466da0 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -108,68 +108,6 @@ limitations under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-        <!-- SPARK GRAPH COMPUTER -->
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.10</artifactId>
-            <version>1.2.1</version>
-            <exclusions>
-                <!-- self conflicts -->
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-databind</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-lang3</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-codec</groupId>
-                    <artifactId>commons-codec</artifactId>
-                </exclusion>
-                <!-- gremlin-core conflicts -->
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>jcl-over-slf4j</artifactId>
-                </exclusion>
-                <!-- gremlin-groovy conflicts -->
-                <exclusion>
-                    <groupId>jline</groupId>
-                    <artifactId>jline</artifactId>
-                </exclusion>
-                <!-- hadoop conflicts -->
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-client</artifactId>
-                </exclusion>
-                <!-- giraph conflicts -->
-                <exclusion>
-                    <groupId>io.netty</groupId>
-                    <artifactId>netty</artifactId>
-                </exclusion>
-                <!-- lgpl conflicts -->
-                <exclusion>
-                    <groupId>com.google.code.findbugs</groupId>
-                    <artifactId>findbugs</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <!-- consistent dependencies -->
         <dependency>
             <groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
index a34153e..18c4b32 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
@@ -30,7 +30,6 @@ import org.apache.tinkerpop.gremlin.groovy.plugin.RemoteAcceptor;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.mapreduce.MapReduceGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
@@ -71,7 +70,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
         add(IMPORT_SPACE + HDFSTools.class.getPackage().getName() + DOT_STAR);
         ////
         add(IMPORT_SPACE + GiraphGraphComputer.class.getPackage().getName() + DOT_STAR);
-        add(IMPORT_SPACE + SparkGraphComputer.class.getPackage().getName() + DOT_STAR);
         add(IMPORT_SPACE + MapReduceGraphComputer.class.getPackage().getName() + DOT_STAR);
     }};
 
@@ -92,7 +90,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", Job.class.getName()));
             ///
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", GiraphGraphComputer.class.getName()));
-            pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", SparkGraphComputer.class.getName()));
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", MapReduceGraphComputer.class.getName()));
             ///
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", HadoopGraph.class.getName()));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
deleted file mode 100644
index 422b676..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.spark.AccumulatorParam;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class RuleAccumulator implements AccumulatorParam<Rule> {
-
-    @Override
-    public Rule addAccumulator(final Rule a, final Rule b) {
-        if (a.getOperation().equals(Rule.Operation.NO_OP))
-            return b;
-        if (b.getOperation().equals(Rule.Operation.NO_OP))
-            return a;
-        else
-            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
-    }
-
-    @Override
-    public Rule addInPlace(final Rule a, final Rule b) {
-        if (a.getOperation().equals(Rule.Operation.NO_OP))
-            return b;
-        if (b.getOperation().equals(Rule.Operation.NO_OP))
-            return a;
-        else
-            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
-    }
-
-    @Override
-    public Rule zero(final Rule rule) {
-        return new Rule(Rule.Operation.NO_OP, null);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
deleted file mode 100644
index b7268b8..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import com.google.common.base.Optional;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.MessagePayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.Payload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewOutgoingPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkExecutor {
-
-    private static final String[] EMPTY_ARRAY = new String[0];
-
-    private SparkExecutor() {
-    }
-
-    ////////////////////
-    // VERTEX PROGRAM //
-    ////////////////////
-
-    public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(
-            final JavaPairRDD<Object, VertexWritable> graphRDD,
-            final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
-            final SparkMemory memory,
-            final Configuration apacheConfiguration) {
-
-        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
-                graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
-                graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages
-                // for each partition of vertices
-                .mapPartitionsToPair(partitionIterator -> {
-                    HadoopPools.initialize(apacheConfiguration);
-                    final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
-                    final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
-                    final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
-                    final SparkMessenger<M> messenger = new SparkMessenger<>();
-                    workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
-                    return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
-                        final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
-                        // drop any compute properties that are cached in memory
-                        if (elementComputeKeysArray.length > 0)
-                            vertex.dropVertexProperties(elementComputeKeysArray);
-                        final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
-                        final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
-                        final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
-                        previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
-                        ///
-                        messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
-                        workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
-                        ///
-                        final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
-                                Collections.emptyList() :
-                                IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
-                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
-                        if (!partitionIterator.hasNext())
-                            workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
-                        return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
-                    });
-                })).setName("viewOutgoingRDD");
-
-        // "message pass" by reducing on the vertex object id of the view and message payloads
-        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
-        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
-                .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
-                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
-                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))  // emit the outgoing message payloads one by one
-                .reduceByKey((a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
-                    if (a instanceof ViewIncomingPayload) {
-                        ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
-                        return a;
-                    } else if (b instanceof ViewIncomingPayload) {
-                        ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
-                        return b;
-                    } else {
-                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
-                        c.mergePayload(a, messageCombiner);
-                        c.mergePayload(b, messageCombiner);
-                        return c;
-                    }
-                })
-                .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
-                .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
-                .mapValues(payload -> payload instanceof ViewIncomingPayload ?
-                        (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
-                        new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
-
-        newViewIncomingRDD.setName("viewIncomingRDD")
-                .foreachPartition(partitionIterator -> {
-                    HadoopPools.initialize(apacheConfiguration);
-                }); // need to complete a task so its BSP and the memory for this iteration is updated
-        return newViewIncomingRDD;
-    }
-
-    /////////////////
-    // MAP REDUCE //
-    ////////////////
-
-    public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
-        return (null == viewIncomingRDD) ?   // there was no vertex program
-                graphRDD.mapValues(vertexWritable -> {
-                    vertexWritable.get().dropEdges();
-                    return vertexWritable;
-                }) :
-                graphRDD.leftOuterJoin(viewIncomingRDD)
-                        .mapValues(tuple -> {
-                            final StarGraph.StarVertex vertex = tuple._1().get();
-                            vertex.dropEdges();
-                            vertex.dropVertexProperties(elementComputeKeys);
-                            final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
-                            view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
-                            return tuple._1();
-                        });
-    }
-
-    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
-            HadoopPools.initialize(apacheConfiguration);
-            final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
-            workerMapReduce.workerStart(MapReduce.Stage.MAP);
-            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
-            return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
-                workerMapReduce.map(ComputerGraph.mapReduce(vertexWritable._2().get()), mapEmitter);
-                if (!partitionIterator.hasNext())
-                    workerMapReduce.workerEnd(MapReduce.Stage.MAP);
-                return mapEmitter.getEmissions();
-            });
-        });
-        if (mapReduce.getMapKeySort().isPresent())
-            mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
-        return mapRDD;
-    }
-
-    // TODO: public static executeCombine()  is this necessary?  YES --- we groupByKey in reduce() where we want to combine first.
-
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
-            HadoopPools.initialize(apacheConfiguration);
-            final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
-            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
-            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
-            return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
-                workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
-                if (!partitionIterator.hasNext())
-                    workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
-                return reduceEmitter.getEmissions();
-            });
-        });
-        if (mapReduce.getReduceKeySort().isPresent())
-            reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
-        return reduceRDD;
-    }
-
-    ///////////////////
-    // Input/Output //
-    //////////////////
-
-    public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION,null);
-        if (null != outputLocation) {
-            // map back to a Hadoop stream for output
-            mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
-                    ObjectWritable.class,
-                    ObjectWritable.class,
-                    SequenceFileOutputFormat.class, hadoopConfiguration);
-           // TODO: mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
-            try {
-            mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
deleted file mode 100644
index eb1e411..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.FileConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputFormatRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputFormatRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Stream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
-
-    public SparkGraphComputer(final HadoopGraph hadoopGraph) {
-        super(hadoopGraph);
-    }
-
-    @Override
-    public Future<ComputerResult> submit() {
-        super.validateStatePriorToExecution();
-        // apache and hadoop configurations that are used throughout the graph computer computation
-        final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
-        apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
-        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
-        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
-            try {
-                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-                apacheConfiguration.setProperty(Constants.MAPRED_INPUT_DIR, inputLocation);
-                hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, inputLocation);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-
-        // create the completable future
-        return CompletableFuture.<ComputerResult>supplyAsync(() -> {
-            final long startTime = System.currentTimeMillis();
-            SparkMemory memory = null;
-            // delete output location
-            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
-            if (null != outputLocation) {
-                try {
-                    FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
-                } catch (final IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
-            }
-            // wire up a spark context
-            final SparkConf sparkConfiguration = new SparkConf();
-            sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
-                    /*final List<Class> classes = new ArrayList<>();
-                    classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
-                    classes.addAll(IOClasses.getSharedHadoopClasses());
-                    classes.add(ViewPayload.class);
-                    classes.add(MessagePayload.class);
-                    classes.add(ViewIncomingPayload.class);
-                    classes.add(ViewOutgoingPayload.class);
-                    sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
-
-            // create the spark configuration from the graph computer configuration
-            hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-            // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
-            try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
-                // add the project jars to the cluster
-                this.loadJars(sparkContext, hadoopConfiguration);
-                // create a message-passing friendly rdd from the input rdd
-                final JavaPairRDD<Object, VertexWritable> graphRDD;
-                try {
-                    graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
-                            .newInstance()
-                            .readGraphRDD(apacheConfiguration, sparkContext)
-                            .setName("graphRDD")
-                            .cache();
-                } catch (final InstantiationException | IllegalAccessException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
-                JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
-
-                ////////////////////////////////
-                // process the vertex program //
-                ////////////////////////////////
-                if (null != this.vertexProgram) {
-                    // set up the vertex program and wire up configurations
-                    memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
-                    this.vertexProgram.setup(memory);
-                    memory.broadcastMemory(sparkContext);
-                    final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
-                    this.vertexProgram.storeState(vertexProgramConfiguration);
-                    ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
-                    ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
-
-                    // execute the vertex program
-                    while (true) {
-                        memory.setInTask(true);
-                        viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
-                        memory.setInTask(false);
-                        if (this.vertexProgram.terminate(memory))
-                            break;
-                        else {
-                            memory.incrIteration();
-                            memory.broadcastMemory(sparkContext);
-                        }
-                    }
-                    // write the graph rdd using the output rdd
-                    if (!this.persist.equals(Persist.NOTHING)) {
-                        try {
-                            hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
-                                    .newInstance()
-                                    .writeGraphRDD(apacheConfiguration, graphRDD);
-                        } catch (final InstantiationException | IllegalAccessException e) {
-                            throw new IllegalStateException(e.getMessage(), e);
-                        }
-                    }
-                }
-
-                final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
-
-                //////////////////////////////
-                // process the map reducers //
-                //////////////////////////////
-                if (!this.mapReducers.isEmpty()) {
-                    final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
-                    final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
-                    for (final MapReduce mapReduce : this.mapReducers) {
-                        // execute the map reduce job
-                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
-                        mapReduce.storeState(newApacheConfiguration);
-                        // map
-                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
-                        // combine TODO: is this really needed
-                        // reduce
-                        final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
-                        // write the map reduce output back to disk (memory)
-                        SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
-                    }
-                }
-                // update runtime and return the newly computed graph
-                finalMemory.setRuntime(System.currentTimeMillis() - startTime);
-                return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
-            }
-        });
-    }
-
-    /////////////////
-
-    private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
-        if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
-            final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
-            if (null == hadoopGremlinLocalLibs)
-                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
-            else {
-                final String[] paths = hadoopGremlinLocalLibs.split(":");
-                for (final String path : paths) {
-                    final File file = new File(path);
-                    if (file.exists())
-                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
-                    else
-                        this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
-                }
-            }
-        }
-    }
-
-    public static void main(final String[] args) throws Exception {
-        final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
-        new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
deleted file mode 100644
index 7141259..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
-
-    private List<Tuple2<K, V>> emissions = new ArrayList<>();
-
-    @Override
-    public void emit(final K key, final V value) {
-        this.emissions.add(new Tuple2<>(key, value));
-    }
-
-    public Iterator<Tuple2<K, V>> getEmissions() {
-        final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
-        this.emissions = new ArrayList<>();
-        return iterator;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
deleted file mode 100644
index e2de405..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMemory implements Memory.Admin, Serializable {
-
-    public final Set<String> memoryKeys = new HashSet<>();
-    private final AtomicInteger iteration = new AtomicInteger(0);   // do these need to be atomics?
-    private final AtomicLong runtime = new AtomicLong(0l);
-    private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
-    private Broadcast<Map<String, Object>> broadcast;
-    private boolean inTask = false;
-
-    public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
-        if (null != vertexProgram) {
-            for (final String key : vertexProgram.getMemoryComputeKeys()) {
-                MemoryHelper.validateKey(key);
-                this.memoryKeys.add(key);
-            }
-        }
-        for (final MapReduce mapReduce : mapReducers) {
-            this.memoryKeys.add(mapReduce.getMemoryKey());
-        }
-        for (final String key : this.memoryKeys) {
-            this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), key, new RuleAccumulator()));
-        }
-        this.broadcast = sparkContext.broadcast(new HashMap<>());
-    }
-
-    @Override
-    public Set<String> keys() {
-        if (this.inTask)
-            return this.broadcast.getValue().keySet();
-        else {
-            final Set<String> trueKeys = new HashSet<>();
-            this.memory.forEach((key, value) -> {
-                if (value.value().getObject() != null)
-                    trueKeys.add(key);
-            });
-            return Collections.unmodifiableSet(trueKeys);
-        }
-    }
-
-    @Override
-    public void incrIteration() {
-        this.iteration.getAndIncrement();
-    }
-
-    @Override
-    public void setIteration(final int iteration) {
-        this.iteration.set(iteration);
-    }
-
-    @Override
-    public int getIteration() {
-        return this.iteration.get();
-    }
-
-    @Override
-    public void setRuntime(final long runTime) {
-        this.runtime.set(runTime);
-    }
-
-    @Override
-    public long getRuntime() {
-        return this.runtime.get();
-    }
-
-    @Override
-    public <R> R get(final String key) throws IllegalArgumentException {
-        final R r = this.getValue(key);
-        if (null == r)
-            throw Memory.Exceptions.memoryDoesNotExist(key);
-        else
-            return r;
-    }
-
-    @Override
-    public void incr(final String key, final long delta) {
-        checkKeyValue(key, delta);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.INCR, this.<Long>getValue(key) + delta));
-    }
-
-    @Override
-    public void and(final String key, final boolean bool) {
-        checkKeyValue(key, bool);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.AND, this.<Boolean>getValue(key) && bool));
-    }
-
-    @Override
-    public void or(final String key, final boolean bool) {
-        checkKeyValue(key, bool);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.OR, this.<Boolean>getValue(key) || bool));
-    }
-
-    @Override
-    public void set(final String key, final Object value) {
-        checkKeyValue(key, value);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.SET, value));
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.memoryString(this);
-    }
-
-    protected void setInTask(final boolean inTask) {
-        this.inTask = inTask;
-    }
-
-    protected void broadcastMemory(final JavaSparkContext sparkContext) {
-        this.broadcast.destroy(true); // do we need to block?
-        final Map<String, Object> toBroadcast = new HashMap<>();
-        this.memory.forEach((key, rule) -> {
-            if (null != rule.value().getObject())
-                toBroadcast.put(key, rule.value().getObject());
-        });
-        this.broadcast = sparkContext.broadcast(toBroadcast);
-    }
-
-    private void checkKeyValue(final String key, final Object value) {
-        if (!this.memoryKeys.contains(key))
-            throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
-        MemoryHelper.validateValue(value);
-    }
-
-    private <R> R getValue(final String key) {
-        return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().getObject();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
deleted file mode 100644
index f52843b..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMessenger<M> implements Messenger<M> {
-
-    private Vertex vertex;
-    private Iterable<M> incomingMessages;
-    private List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
-
-    public void setVertexAndIncomingMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
-        this.vertex = vertex;
-        this.incomingMessages = incomingMessages;
-        this.outgoingMessages = new ArrayList<>();
-    }
-
-    public List<Tuple2<Object, M>> getOutgoingMessages() {
-        return this.outgoingMessages;
-    }
-
-    @Override
-    public Iterator<M> receiveMessages() {
-        return this.incomingMessages.iterator();
-    }
-
-    @Override
-    public void sendMessage(final MessageScope messageScope, final M message) {
-        if (messageScope instanceof MessageScope.Local) {
-            final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
-            final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
-            final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
-            incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
-        } else {
-            ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
-        }
-    }
-
-    ///////////
-
-    private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
-        incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
-        return (T) incidentTraversal;
-    }
-
-    private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
-        final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
-        return step.getDirection().opposite();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
deleted file mode 100644
index a5d0175..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
-
-    private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
-
-    @Override
-    public void emit(final OK key, final OV value) {
-        this.emissions.add(new Tuple2<>(key, value));
-    }
-
-    public Iterator<Tuple2<OK, OV>> getEmissions() {
-        final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
-        this.emissions = new ArrayList<>();
-        return iterator;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java
deleted file mode 100644
index 082c3b6..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class InputFormatRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
-        return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
-                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
-                NullWritable.class,
-                VertexWritable.class)
-                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
-                .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java
deleted file mode 100644
index 75f602b..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
- * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface InputRDD {
-
-    /**
-     * Read the graphRDD from the underlying graph system.
-     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer}.
-     * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
-     * @return an adjacency list representation of the underlying graph system.
-     */
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
deleted file mode 100644
index 8d64fd2..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class OutputFormatRDD implements OutputRDD {
-
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-        if (null != outputLocation) {
-            // map back to a <nullwritable,vertexwritable> stream for output
-            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
-                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
-                            NullWritable.class,
-                            VertexWritable.class,
-                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
deleted file mode 100644
index 8f6ef7a..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface OutputRDD {
-
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
deleted file mode 100644
index 191c9ca..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class MessagePayload<M> implements Payload {
-
-    private final M message;
-
-    public MessagePayload(final M message) {
-        this.message = message;
-    }
-
-    public M getMessage() {
-        return this.message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
deleted file mode 100644
index d901df7..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import java.io.Serializable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface Payload extends Serializable {
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
deleted file mode 100644
index b236fe9..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ViewIncomingPayload<M> implements Payload {
-
-    private List<DetachedVertexProperty<Object>> view = null;
-    private final List<M> incomingMessages;
-
-
-    public ViewIncomingPayload() {
-        this.incomingMessages = null;
-    }
-
-    public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
-        this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
-    }
-
-    public ViewIncomingPayload(final ViewPayload viewPayload) {
-        this.incomingMessages = null;
-        this.view = viewPayload.getView();
-    }
-
-
-    public List<DetachedVertexProperty<Object>> getView() {
-        return null == this.view ? Collections.emptyList() : this.view;
-    }
-
-
-    public List<M> getIncomingMessages() {
-        return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
-    }
-
-    public boolean hasView() {
-        return null != view;
-    }
-
-    ////////////////////
-
-
-    private void mergeMessage(final M message, final MessageCombiner<M> messageCombiner) {
-        if (this.incomingMessages.isEmpty() || null == messageCombiner)
-            this.incomingMessages.add(message);
-        else
-            this.incomingMessages.set(0, messageCombiner.combine(this.incomingMessages.get(0), message));
-    }
-
-    private void mergeViewIncomingPayload(final ViewIncomingPayload<M> viewIncomingPayload, final MessageCombiner<M> messageCombiner) {
-        if (this.view == null)
-            this.view = viewIncomingPayload.view;
-        else
-            this.view.addAll(viewIncomingPayload.getView());
-
-        for (final M message : viewIncomingPayload.getIncomingMessages()) {
-            this.mergeMessage(message, messageCombiner);
-        }
-    }
-
-    public void mergePayload(final Payload payload, final MessageCombiner<M> messageCombiner) {
-        if (payload instanceof ViewPayload)
-            this.view = ((ViewPayload) payload).getView();
-        else if (payload instanceof MessagePayload)
-            this.mergeMessage(((MessagePayload<M>) payload).getMessage(), messageCombiner);
-        else if (payload instanceof ViewIncomingPayload)
-            this.mergeViewIncomingPayload((ViewIncomingPayload<M>) payload, messageCombiner);
-        else
-            throw new IllegalArgumentException("The provided payload is an unsupported merge payload: " + payload);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
deleted file mode 100644
index c4daf47..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-import scala.Tuple2;
-
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ViewOutgoingPayload<M> implements Payload {
-
-    private final List<DetachedVertexProperty<Object>> view;
-    private final List<Tuple2<Object,M>> outgoingMessages;
-
-    public ViewOutgoingPayload(final List<DetachedVertexProperty<Object>> view, final List<Tuple2<Object,M>> outgoingMessages) {
-        this.view = view;
-        this.outgoingMessages = outgoingMessages;
-    }
-
-    public ViewPayload getView() {
-        return new ViewPayload(this.view);
-    }
-
-    public List<Tuple2<Object,M>> getOutgoingMessages() {
-        return this.outgoingMessages;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
deleted file mode 100644
index 0ec5ef5..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ViewPayload implements Payload {
-
-    private final List<DetachedVertexProperty<Object>> view;
-
-    public ViewPayload(final List<DetachedVertexProperty<Object>> view) {
-        this.view = view;
-    }
-
-    public List<DetachedVertexProperty<Object>> getView() {
-        return this.view;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 18d515e..3c3c9b7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopEdgeIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopVertexIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
@@ -192,10 +191,17 @@ public final class HadoopGraph implements Graph {
     public <C extends GraphComputer> C compute(final Class<C> graphComputerClass) {
         if (graphComputerClass.equals(GiraphGraphComputer.class))
             return (C) new GiraphGraphComputer(this);
-        else if (graphComputerClass.equals(SparkGraphComputer.class))
-            return (C) new SparkGraphComputer(this);
-        else
-            throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass);
+        else {
+            try {
+                return graphComputerClass.getConstructor(HadoopGraph.class).newInstance(this);
+            } catch (final Exception e) {
+                throw new IllegalArgumentException(e.getMessage(), e);
+            }
+        }
+        //else if (graphComputerClass.equals(SparkGraphComputer.class))
+        //    return (C) new SparkGraphComputer(this);
+        //else
+        //   throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java
deleted file mode 100644
index af0d745..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.GraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@GraphProvider.Descriptor(computer = SparkGraphComputer.class)
-public final class HadoopSparkGraphProvider extends HadoopGraphProvider {
-
-    public GraphTraversalSource traversal(final Graph graph) {
-        return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
deleted file mode 100644
index ae729fd..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(ProcessComputerSuite.class)
-@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
-public class SparkGraphComputerProcessIntegrateTest {
-}