You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/03 20:50:30 UTC

[1/3] incubator-tinkerpop git commit: Spark is now isolated into spark-gremlin package. Tests are passing. A few hacks here and there just to get things building. However, it was pretty easy to split apart -- which is pomising.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/hadoop_split [created] 04f5651e8


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
new file mode 100644
index 0000000..56a1297
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputFormatRDD.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class OutputFormatRDD implements OutputRDD {
+
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a <nullwritable,vertexwritable> stream for output
+            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
+                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
+                            NullWritable.class,
+                            VertexWritable.class,
+                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
new file mode 100644
index 0000000..2580252
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDD.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface OutputRDD {
+
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java
new file mode 100644
index 0000000..09e2599
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.payload;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MessagePayload<M> implements Payload {
+
+    private final M message;
+
+    public MessagePayload(final M message) {
+        this.message = message;
+    }
+
+    public M getMessage() {
+        return this.message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/Payload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/Payload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/Payload.java
new file mode 100644
index 0000000..5d85c61
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/Payload.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.payload;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Payload extends Serializable {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
new file mode 100644
index 0000000..911fc7b
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.payload;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ViewIncomingPayload<M> implements Payload {
+
+    private List<DetachedVertexProperty<Object>> view = null;
+    private final List<M> incomingMessages;
+
+
+    public ViewIncomingPayload() {
+        this.incomingMessages = null;
+    }
+
+    public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
+        this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
+    }
+
+    public ViewIncomingPayload(final ViewPayload viewPayload) {
+        this.incomingMessages = null;
+        this.view = viewPayload.getView();
+    }
+
+
+    public List<DetachedVertexProperty<Object>> getView() {
+        return null == this.view ? Collections.emptyList() : this.view;
+    }
+
+
+    public List<M> getIncomingMessages() {
+        return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
+    }
+
+    public boolean hasView() {
+        return null != view;
+    }
+
+    ////////////////////
+
+
+    private void mergeMessage(final M message, final MessageCombiner<M> messageCombiner) {
+        if (this.incomingMessages.isEmpty() || null == messageCombiner)
+            this.incomingMessages.add(message);
+        else
+            this.incomingMessages.set(0, messageCombiner.combine(this.incomingMessages.get(0), message));
+    }
+
+    private void mergeViewIncomingPayload(final ViewIncomingPayload<M> viewIncomingPayload, final MessageCombiner<M> messageCombiner) {
+        if (this.view == null)
+            this.view = viewIncomingPayload.view;
+        else
+            this.view.addAll(viewIncomingPayload.getView());
+
+        for (final M message : viewIncomingPayload.getIncomingMessages()) {
+            this.mergeMessage(message, messageCombiner);
+        }
+    }
+
+    public void mergePayload(final Payload payload, final MessageCombiner<M> messageCombiner) {
+        if (payload instanceof ViewPayload)
+            this.view = ((ViewPayload) payload).getView();
+        else if (payload instanceof MessagePayload)
+            this.mergeMessage(((MessagePayload<M>) payload).getMessage(), messageCombiner);
+        else if (payload instanceof ViewIncomingPayload)
+            this.mergeViewIncomingPayload((ViewIncomingPayload<M>) payload, messageCombiner);
+        else
+            throw new IllegalArgumentException("The provided payload is an unsupported merge payload: " + payload);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
new file mode 100644
index 0000000..fc4aeed
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.payload;
+
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+import scala.Tuple2;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ViewOutgoingPayload<M> implements Payload {
+
+    private final List<DetachedVertexProperty<Object>> view;
+    private final List<Tuple2<Object,M>> outgoingMessages;
+
+    public ViewOutgoingPayload(final List<DetachedVertexProperty<Object>> view, final List<Tuple2<Object,M>> outgoingMessages) {
+        this.view = view;
+        this.outgoingMessages = outgoingMessages;
+    }
+
+    public ViewPayload getView() {
+        return new ViewPayload(this.view);
+    }
+
+    public List<Tuple2<Object,M>> getOutgoingMessages() {
+        return this.outgoingMessages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
new file mode 100644
index 0000000..dc9376d
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.payload;
+
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ViewPayload implements Payload {
+
+    private final List<DetachedVertexProperty<Object>> view;
+
+    public ViewPayload(final List<DetachedVertexProperty<Object>> view) {
+        this.view = view;
+    }
+
+    public List<DetachedVertexProperty<Object>> getView() {
+        return this.view;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/HadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/HadoopGraphProvider.java
new file mode 100644
index 0000000..f085678
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/HadoopGraphProvider.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopElement;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONResourceAccess;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess;
+import org.apache.tinkerpop.gremlin.structure.io.script.ScriptResourceAccess;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class HadoopGraphProvider extends AbstractGraphProvider {
+
+    private static final Random RANDOM = new Random();
+    private boolean graphSONInput = false;
+
+    public static Map<String, String> PATHS = new HashMap<>();
+    private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{
+        add(HadoopEdge.class);
+        add(HadoopElement.class);
+        add(HadoopGraph.class);
+        add(HadoopProperty.class);
+        add(HadoopVertex.class);
+        add(HadoopVertexProperty.class);
+    }};
+
+    static {
+        try {
+            final List<String> kryoResources = Arrays.asList(
+                    "tinkerpop-modern.kryo",
+                    "grateful-dead.kryo",
+                    "tinkerpop-classic.kryo",
+                    "tinkerpop-crew.kryo");
+            for (final String fileName : kryoResources) {
+                PATHS.put(fileName, TestHelper.generateTempFileFromResource(GryoResourceAccess.class, fileName, "").getAbsolutePath());
+            }
+
+            final List<String> graphsonResources = Arrays.asList(
+                    "tinkerpop-modern.json",
+                    "grateful-dead.json",
+                    "tinkerpop-classic.json",
+                    "tinkerpop-crew.json");
+            for (final String fileName : graphsonResources) {
+                PATHS.put(fileName, TestHelper.generateTempFileFromResource(GraphSONResourceAccess.class, fileName, "").getAbsolutePath());
+            }
+
+            final List<String> scriptResources = Arrays.asList(
+                    "tinkerpop-classic.txt",
+                    "script-input.groovy",
+                    "script-output.groovy",
+                    "grateful-dead.txt",
+                    "script-input-grateful-dead.groovy",
+                    "script-output-grateful-dead.groovy");
+            for (final String fileName : scriptResources) {
+                PATHS.put(fileName, TestHelper.generateTempFileFromResource(ScriptResourceAccess.class, fileName, "").getAbsolutePath());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
+        this.graphSONInput = RANDOM.nextBoolean();
+        return new HashMap<String, Object>() {{
+            put(Graph.GRAPH, HadoopGraph.class.getName());
+            put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, graphSONInput ? GraphSONInputFormat.class.getCanonicalName() : GryoInputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
+            put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+            /// giraph configuration
+            put(GiraphConstants.MIN_WORKERS, 1);
+            put(GiraphConstants.MAX_WORKERS, 1);
+            put(GiraphConstants.SPLIT_MASTER_WORKER.getKey(), false);
+            put(GiraphConstants.ZOOKEEPER_SERVER_PORT.getKey(), 2181);  // you must have a local zookeeper running on this port
+            put(GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
+            put(GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
+            put(GiraphConstants.NUM_INPUT_THREADS.getKey(), 3);
+            put(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 3);
+            put(GiraphConstants.MAX_MASTER_SUPERSTEP_WAIT_MSECS.getKey(), TimeUnit.MINUTES.toMillis(60L));
+            put("mapred.reduce.tasks", 4);
+            //put("giraph.vertexOutputFormatThreadSafe", false);
+            //put("giraph.numOutputThreads", 3);
+
+            /// spark configuration
+            put("spark.master", "local[4]");
+            put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+            // put("spark.kryo.registrationRequired",true);
+        }};
+    }
+
+    @Override
+    public void clear(final Graph graph, final Configuration configuration) throws Exception {
+        if (graph != null)
+            graph.close();
+    }
+
+    @Override
+    public void loadGraphData(final Graph graph, final LoadGraphWith loadGraphWith, final Class testClass, final String testName) {
+        if (loadGraphWith != null) this.loadGraphDataViaHadoopConfig(graph, loadGraphWith.value());
+    }
+
+    @Override
+    public Set<Class> getImplementations() {
+        return IMPLEMENTATION;
+    }
+
+    public void loadGraphDataViaHadoopConfig(final Graph g, final LoadGraphWith.GraphData graphData) {
+        final String type = this.graphSONInput ? "json" : "kryo";
+
+        if (graphData.equals(LoadGraphWith.GraphData.GRATEFUL)) {
+            ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("grateful-dead." + type));
+        } else if (graphData.equals(LoadGraphWith.GraphData.MODERN)) {
+            ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-modern." + type));
+        } else if (graphData.equals(LoadGraphWith.GraphData.CLASSIC)) {
+            ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-classic." + type));
+        } else if (graphData.equals(LoadGraphWith.GraphData.CREW)) {
+            ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-crew." + type));
+        } else {
+            throw new RuntimeException("Could not load graph with " + graphData);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
new file mode 100644
index 0000000..c0c5bba
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/HadoopSparkGraphProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.GraphProvider;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@GraphProvider.Descriptor(computer = SparkGraphComputer.class)
+public final class HadoopSparkGraphProvider extends HadoopGraphProvider {
+
+    public GraphTraversalSource traversal(final Graph graph) {
+        return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessIntegrateTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessIntegrateTest.java
new file mode 100644
index 0000000..da32c8c
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessIntegrateTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(ProcessComputerSuite.class)
+@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
+public class SparkGraphComputerProcessIntegrateTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
new file mode 100644
index 0000000..ec1f036
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.groovy;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
+import org.apache.tinkerpop.gremlin.spark.process.computer.HadoopSparkGraphProvider;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(GroovyProcessComputerSuite.class)
+@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
+public class SparkGraphComputerGroovyProcessIntegrateTest {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkHadoopGremlinPluginTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkHadoopGremlinPluginTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkHadoopGremlinPluginTest.java
new file mode 100644
index 0000000..ddfe2d9
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkHadoopGremlinPluginTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.groovy;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.spark.process.computer.HadoopSparkGraphProvider;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+//@RunWith(HadoopPluginSuite.class)
+@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
+public class SparkHadoopGremlinPluginTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
new file mode 100644
index 0000000..1fb85a1
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleInputRDD.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ExampleInputRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final List<Vertex> list = new ArrayList<>();
+        list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
+        list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
+        list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
+        list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
+        return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
new file mode 100644
index 0000000..bb38f7f
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/ExampleOutputRDD.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ExampleOutputRDD implements OutputRDD {
+    @Override
+    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+        int totalAge = 0;
+        final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
+        while (iterator.hasNext()) {
+            final Vertex vertex = iterator.next().get();
+            if (vertex.label().equals("person"))
+                totalAge = totalAge + vertex.<Integer>value("age");
+        }
+        assertEquals(123, totalAge);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
new file mode 100644
index 0000000..8f35b54
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class InputOutputRDDTest {
+
+    @Test
+    public void shouldReadFromWriteToArbitraryRDD() throws Exception {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
new file mode 100644
index 0000000..8ac9ba3
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class InputRDDTest {
+
+    @Test
+    public void shouldReadFromArbitraryRDD() {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
+        assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
new file mode 100644
index 0000000..4ded9bc
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class OutputRDDTest {
+
+    @Test
+    public void shouldWriteToArbitraryRDD() throws Exception {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, HadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        ////////
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+    }
+}


[2/3] incubator-tinkerpop git commit: Spark is now isolated into spark-gremlin package. Tests are passing. A few hacks here and there just to get things building. However, it was pretty easy to split apart -- which is pomising.

Posted by ok...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
deleted file mode 100644
index cc07bac..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.groovy;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.HadoopSparkGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(GroovyProcessComputerSuite.class)
-@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
-public class SparkGraphComputerGroovyProcessIntegrateTest {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java
deleted file mode 100644
index 3d83142..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.groovy;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopPluginSuite;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.HadoopSparkGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(HadoopPluginSuite.class)
-@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
-public class SparkHadoopGremlinPluginTest {
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java
deleted file mode 100644
index d456808..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.T;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleInputRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final List<Vertex> list = new ArrayList<>();
-        list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
-        list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
-        list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
-        list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
-        return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java
deleted file mode 100644
index 45ed114..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleOutputRDD implements OutputRDD {
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        int totalAge = 0;
-        final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
-        while (iterator.hasNext()) {
-            final Vertex vertex = iterator.next().get();
-            if (vertex.label().equals("person"))
-                totalAge = totalAge + vertex.<Integer>value("age");
-        }
-        assertEquals(123, totalAge);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java
deleted file mode 100644
index ea1cb87..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputOutputRDDTest {
-
-    @Test
-    public void shouldReadFromWriteToArbitraryRDD() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
deleted file mode 100644
index c600327..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputRDDTest {
-
-    @Test
-    public void shouldReadFromArbitraryRDD() {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
-        assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java
deleted file mode 100644
index deea6ab..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class OutputRDDTest {
-
-    @Test
-    public void shouldWriteToArbitraryRDD() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, HadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 769f84f..7a4411c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,7 @@ limitations under the License.
         <module>gremlin-driver</module>
         <module>gremlin-console</module>
         <module>gremlin-server</module>
+        <module>spark-gremlin</module>
     </modules>
     <scm>
         <connection>scm:git:git@git-wip-us.apache.org:repos/asf/incubator-tinkerpop.git</connection>

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
new file mode 100644
index 0000000..277ce11
--- /dev/null
+++ b/spark-gremlin/pom.xml
@@ -0,0 +1,249 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.tinkerpop</groupId>
+        <artifactId>tinkerpop</artifactId>
+        <version>3.1.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>spark-gremlin</artifactId>
+    <name>Apache TinkerPop :: Spark Gremlin</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-groovy</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>hadoop-gremlin</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- SPARK GRAPH COMPUTER -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <version>1.2.1</version>
+            <exclusions>
+                <!-- self conflicts -->
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-lang3</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-codec</groupId>
+                    <artifactId>commons-codec</artifactId>
+                </exclusion>
+                <!-- gremlin-core conflicts -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jcl-over-slf4j</artifactId>
+                </exclusion>
+                <!-- gremlin-groovy conflicts -->
+                <exclusion>
+                    <groupId>jline</groupId>
+                    <artifactId>jline</artifactId>
+                </exclusion>
+                <!-- hadoop conflicts -->
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+                <!-- giraph conflicts -->
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <!-- lgpl conflicts -->
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>findbugs</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- consistent dependencies -->
+<!--        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.10.3</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+            <version>3.5.13.Final</version>
+        </dependency>
+-->
+        <!-- TEST -->
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-test</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-groovy-test</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>tinkergraph-gremlin</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <repositories>
+        <repository>
+            <id>hyracks-releases</id>
+            <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public-releases/</url>
+        </repository>
+    </repositories>
+    <build>
+        <directory>${basedir}/target</directory>
+        <finalName>${project.artifactId}-${project.version}</finalName>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>${basedir}/src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>build-detached-assemblies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <attach>false</attach>
+                            <descriptors>
+                                <descriptor>src/assembly/standalone.xml</descriptor>
+                                <descriptor>src/assembly/hadoop-job.xml</descriptor>
+                            </descriptors>
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.gmavenplus</groupId>
+                <artifactId>gmavenplus-plugin</artifactId>
+                <version>1.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>addSources</goal>
+                            <goal>addTestSources</goal>
+                            <goal>generateStubs</goal>
+                            <goal>compile</goal>
+                            <goal>testGenerateStubs</goal>
+                            <goal>testCompile</goal>
+                            <goal>removeStubs</goal>
+                            <goal>removeTestStubs</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <invokeDynamic>true</invokeDynamic>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.5</version>
+                <configuration>
+                    <archive>
+                        <manifestEntries>
+                            <Gremlin-Plugin-Dependencies>org.apache.hadoop:hadoop-core:1.2.1
+                            </Gremlin-Plugin-Dependencies>
+                            <!-- deletes the servlet-api jar from the path after install - causes conflicts -->
+                            <Gremlin-Plugin-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+                            </Gremlin-Plugin-Paths>
+                            <Gremlin-Lib-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+                            </Gremlin-Lib-Paths>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <!-- log4j.configuration>log4j-core.properties</log4j.configuration> -->
+                    <!--<argLine>-Xmx2048M</argLine>-->
+                    <excludes>
+                        <exclude>**/*IntegrateTest.java</exclude>
+                        <exclude>**/*PerformanceTest.java</exclude>
+                        <!-- this is technically a member of the integration test suite -->
+                        <exclude>**/HadoopGremlinPluginTest.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
new file mode 100644
index 0000000..0b04300
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.groovy.plugin;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkGremlinPlugin {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
new file mode 100644
index 0000000..bebd283
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.spark.AccumulatorParam;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RuleAccumulator implements AccumulatorParam<Rule> {
+
+    @Override
+    public Rule addAccumulator(final Rule a, final Rule b) {
+        if (a.getOperation().equals(Rule.Operation.NO_OP))
+            return b;
+        if (b.getOperation().equals(Rule.Operation.NO_OP))
+            return a;
+        else
+            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
+    }
+
+    @Override
+    public Rule addInPlace(final Rule a, final Rule b) {
+        if (a.getOperation().equals(Rule.Operation.NO_OP))
+            return b;
+        if (b.getOperation().equals(Rule.Operation.NO_OP))
+            return a;
+        else
+            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
+    }
+
+    @Override
+    public Rule zero(final Rule rule) {
+        return new Rule(Rule.Operation.NO_OP, null);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
new file mode 100644
index 0000000..288538f
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -0,0 +1,202 @@
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import com.google.common.base.Optional;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkExecutor {
+
+    private static final String[] EMPTY_ARRAY = new String[0];
+
+    private SparkExecutor() {
+    }
+
+    ////////////////////
+    // VERTEX PROGRAM //
+    ////////////////////
+
+    public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(
+            final JavaPairRDD<Object, VertexWritable> graphRDD,
+            final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
+            final SparkMemory memory,
+            final Configuration apacheConfiguration) {
+
+        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
+                graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
+                graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages
+                // for each partition of vertices
+                .mapPartitionsToPair(partitionIterator -> {
+                    HadoopPools.initialize(apacheConfiguration);
+                    final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
+                    final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
+                    final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+                    final SparkMessenger<M> messenger = new SparkMessenger<>();
+                    workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
+                    return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
+                        final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
+                        // drop any compute properties that are cached in memory
+                        if (elementComputeKeysArray.length > 0)
+                            vertex.dropVertexProperties(elementComputeKeysArray);
+                        final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
+                        final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
+                        final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
+                        previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
+                        ///
+                        messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
+                        workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
+                        ///
+                        final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
+                                Collections.emptyList() :
+                                IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
+                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+                        if (!partitionIterator.hasNext())
+                            workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
+                        return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+                    });
+                })).setName("viewOutgoingRDD");
+
+        // "message pass" by reducing on the vertex object id of the view and message payloads
+        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
+        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
+                .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
+                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))  // emit the outgoing message payloads one by one
+                .reduceByKey((a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
+                    if (a instanceof ViewIncomingPayload) {
+                        ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
+                        return a;
+                    } else if (b instanceof ViewIncomingPayload) {
+                        ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
+                        return b;
+                    } else {
+                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
+                        c.mergePayload(a, messageCombiner);
+                        c.mergePayload(b, messageCombiner);
+                        return c;
+                    }
+                })
+                .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
+                .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
+                .mapValues(payload -> payload instanceof ViewIncomingPayload ?
+                        (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
+                        new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
+
+        newViewIncomingRDD.setName("viewIncomingRDD")
+                .foreachPartition(partitionIterator -> {
+                    HadoopPools.initialize(apacheConfiguration);
+                }); // need to complete a task so its BSP and the memory for this iteration is updated
+        return newViewIncomingRDD;
+    }
+
+    /////////////////
+    // MAP REDUCE //
+    ////////////////
+
+    public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
+        return (null == viewIncomingRDD) ?   // there was no vertex program
+                graphRDD.mapValues(vertexWritable -> {
+                    vertexWritable.get().dropEdges();
+                    return vertexWritable;
+                }) :
+                graphRDD.leftOuterJoin(viewIncomingRDD)
+                        .mapValues(tuple -> {
+                            final StarGraph.StarVertex vertex = tuple._1().get();
+                            vertex.dropEdges();
+                            vertex.dropVertexProperties(elementComputeKeys);
+                            final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
+                            view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
+                            return tuple._1();
+                        });
+    }
+
+    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
+            HadoopPools.initialize(apacheConfiguration);
+            final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+            workerMapReduce.workerStart(MapReduce.Stage.MAP);
+            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+            return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
+                workerMapReduce.map(ComputerGraph.mapReduce(vertexWritable._2().get()), mapEmitter);
+                if (!partitionIterator.hasNext())
+                    workerMapReduce.workerEnd(MapReduce.Stage.MAP);
+                return mapEmitter.getEmissions();
+            });
+        });
+        if (mapReduce.getMapKeySort().isPresent())
+            mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
+        return mapRDD;
+    }
+
+    // TODO: public static executeCombine()  is this necessary?  YES --- we groupByKey in reduce() where we want to combine first.
+
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
+            HadoopPools.initialize(apacheConfiguration);
+            final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
+            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+            return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
+                workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
+                if (!partitionIterator.hasNext())
+                    workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
+                return reduceEmitter.getEmissions();
+            });
+        });
+        if (mapReduce.getReduceKeySort().isPresent())
+            reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
+        return reduceRDD;
+    }
+
+    ///////////////////
+    // Input/Output //
+    //////////////////
+
+    public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+        if (null != outputLocation) {
+            // map back to a Hadoop stream for output
+            mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+                    ObjectWritable.class,
+                    ObjectWritable.class,
+                    SequenceFileOutputFormat.class, hadoopConfiguration);
+            // TODO: mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
+            try {
+                mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
new file mode 100644
index 0000000..3aa4383
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.FileConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
+
+    public SparkGraphComputer(final HadoopGraph hadoopGraph) {
+        super(hadoopGraph);
+    }
+
+    @Override
+    public Future<ComputerResult> submit() {
+        super.validateStatePriorToExecution();
+        // apache and hadoop configurations that are used throughout the graph computer computation
+        final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
+        apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
+        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
+        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+            try {
+                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+                apacheConfiguration.setProperty(Constants.MAPRED_INPUT_DIR, inputLocation);
+                hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, inputLocation);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        // create the completable future
+        return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+            final long startTime = System.currentTimeMillis();
+            SparkMemory memory = null;
+            // delete output location
+            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+            if (null != outputLocation) {
+                try {
+                    FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
+                } catch (final IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+            }
+            // wire up a spark context
+            final SparkConf sparkConfiguration = new SparkConf();
+            sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
+                    /*final List<Class> classes = new ArrayList<>();
+                    classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
+                    classes.addAll(IOClasses.getSharedHadoopClasses());
+                    classes.add(ViewPayload.class);
+                    classes.add(MessagePayload.class);
+                    classes.add(ViewIncomingPayload.class);
+                    classes.add(ViewOutgoingPayload.class);
+                    sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
+
+            // create the spark configuration from the graph computer configuration
+            hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+            // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
+            try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+                // add the project jars to the cluster
+                this.loadJars(sparkContext, hadoopConfiguration);
+                // create a message-passing friendly rdd from the input rdd
+                final JavaPairRDD<Object, VertexWritable> graphRDD;
+                try {
+                    graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+                            .newInstance()
+                            .readGraphRDD(apacheConfiguration, sparkContext)
+                            .setName("graphRDD")
+                            .cache();
+                } catch (final InstantiationException | IllegalAccessException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+                JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
+
+                ////////////////////////////////
+                // process the vertex program //
+                ////////////////////////////////
+                if (null != this.vertexProgram) {
+                    // set up the vertex program and wire up configurations
+                    memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
+                    this.vertexProgram.setup(memory);
+                    memory.broadcastMemory(sparkContext);
+                    final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
+                    this.vertexProgram.storeState(vertexProgramConfiguration);
+                    ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+                    ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
+
+                    // execute the vertex program
+                    while (true) {
+                        memory.setInTask(true);
+                        viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+                        memory.setInTask(false);
+                        if (this.vertexProgram.terminate(memory))
+                            break;
+                        else {
+                            memory.incrIteration();
+                            memory.broadcastMemory(sparkContext);
+                        }
+                    }
+                    // write the graph rdd using the output rdd
+                    if (!this.persist.equals(GraphComputer.Persist.NOTHING)) {
+                        try {
+                            hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
+                                    .newInstance()
+                                    .writeGraphRDD(apacheConfiguration, graphRDD);
+                        } catch (final InstantiationException | IllegalAccessException e) {
+                            throw new IllegalStateException(e.getMessage(), e);
+                        }
+                    }
+                }
+
+                final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
+
+                //////////////////////////////
+                // process the map reducers //
+                //////////////////////////////
+                if (!this.mapReducers.isEmpty()) {
+                    final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
+                    final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
+                    for (final MapReduce mapReduce : this.mapReducers) {
+                        // execute the map reduce job
+                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
+                        mapReduce.storeState(newApacheConfiguration);
+                        // map
+                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
+                        // combine TODO: is this really needed
+                        // reduce
+                        final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
+                        // write the map reduce output back to disk (memory)
+                        SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
+                    }
+                }
+                // update runtime and return the newly computed graph
+                finalMemory.setRuntime(System.currentTimeMillis() - startTime);
+                return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
+            }
+        });
+    }
+
+    /////////////////
+
+    private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
+        if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
+            final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
+            if (null == hadoopGremlinLocalLibs)
+                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
+            else {
+                final String[] paths = hadoopGremlinLocalLibs.split(":");
+                for (final String path : paths) {
+                    final File file = new File(path);
+                    if (file.exists())
+                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
+                    else
+                        this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
+                }
+            }
+        }
+    }
+
+    public static void main(final String[] args) throws Exception {
+        final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+        new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
new file mode 100644
index 0000000..cf31249
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
+
+    private List<Tuple2<K, V>> emissions = new ArrayList<>();
+
+    @Override
+    public void emit(final K key, final V value) {
+        this.emissions.add(new Tuple2<>(key, value));
+    }
+
+    public Iterator<Tuple2<K, V>> getEmissions() {
+        final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
+        this.emissions = new ArrayList<>();
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
new file mode 100644
index 0000000..dbfadcc
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkMemory implements Memory.Admin, Serializable {
+
+    public final Set<String> memoryKeys = new HashSet<>();
+    private final AtomicInteger iteration = new AtomicInteger(0);   // do these need to be atomics?
+    private final AtomicLong runtime = new AtomicLong(0l);
+    private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
+    private Broadcast<Map<String, Object>> broadcast;
+    private boolean inTask = false;
+
+    public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
+        if (null != vertexProgram) {
+            for (final String key : vertexProgram.getMemoryComputeKeys()) {
+                MemoryHelper.validateKey(key);
+                this.memoryKeys.add(key);
+            }
+        }
+        for (final MapReduce mapReduce : mapReducers) {
+            this.memoryKeys.add(mapReduce.getMemoryKey());
+        }
+        for (final String key : this.memoryKeys) {
+            this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), key, new RuleAccumulator()));
+        }
+        this.broadcast = sparkContext.broadcast(new HashMap<>());
+    }
+
+    @Override
+    public Set<String> keys() {
+        if (this.inTask)
+            return this.broadcast.getValue().keySet();
+        else {
+            final Set<String> trueKeys = new HashSet<>();
+            this.memory.forEach((key, value) -> {
+                if (value.value().getObject() != null)
+                    trueKeys.add(key);
+            });
+            return Collections.unmodifiableSet(trueKeys);
+        }
+    }
+
+    @Override
+    public void incrIteration() {
+        this.iteration.getAndIncrement();
+    }
+
+    @Override
+    public void setIteration(final int iteration) {
+        this.iteration.set(iteration);
+    }
+
+    @Override
+    public int getIteration() {
+        return this.iteration.get();
+    }
+
+    @Override
+    public void setRuntime(final long runTime) {
+        this.runtime.set(runTime);
+    }
+
+    @Override
+    public long getRuntime() {
+        return this.runtime.get();
+    }
+
+    @Override
+    public <R> R get(final String key) throws IllegalArgumentException {
+        final R r = this.getValue(key);
+        if (null == r)
+            throw Memory.Exceptions.memoryDoesNotExist(key);
+        else
+            return r;
+    }
+
+    @Override
+    public void incr(final String key, final long delta) {
+        checkKeyValue(key, delta);
+        if (this.inTask)
+            this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
+        else
+            this.memory.get(key).setValue(new Rule(Rule.Operation.INCR, this.<Long>getValue(key) + delta));
+    }
+
+    @Override
+    public void and(final String key, final boolean bool) {
+        checkKeyValue(key, bool);
+        if (this.inTask)
+            this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
+        else
+            this.memory.get(key).setValue(new Rule(Rule.Operation.AND, this.<Boolean>getValue(key) && bool));
+    }
+
+    @Override
+    public void or(final String key, final boolean bool) {
+        checkKeyValue(key, bool);
+        if (this.inTask)
+            this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
+        else
+            this.memory.get(key).setValue(new Rule(Rule.Operation.OR, this.<Boolean>getValue(key) || bool));
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        checkKeyValue(key, value);
+        if (this.inTask)
+            this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
+        else
+            this.memory.get(key).setValue(new Rule(Rule.Operation.SET, value));
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.memoryString(this);
+    }
+
+    protected void setInTask(final boolean inTask) {
+        this.inTask = inTask;
+    }
+
+    protected void broadcastMemory(final JavaSparkContext sparkContext) {
+        this.broadcast.destroy(true); // do we need to block?
+        final Map<String, Object> toBroadcast = new HashMap<>();
+        this.memory.forEach((key, rule) -> {
+            if (null != rule.value().getObject())
+                toBroadcast.put(key, rule.value().getObject());
+        });
+        this.broadcast = sparkContext.broadcast(toBroadcast);
+    }
+
+    private void checkKeyValue(final String key, final Object value) {
+        if (!this.memoryKeys.contains(key))
+            throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
+        MemoryHelper.validateValue(value);
+    }
+
+    private <R> R getValue(final String key) {
+        return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().getObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
new file mode 100644
index 0000000..f32c684
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkMessenger<M> implements Messenger<M> {
+
+    private Vertex vertex;
+    private Iterable<M> incomingMessages;
+    private List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
+
+    public void setVertexAndIncomingMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
+        this.vertex = vertex;
+        this.incomingMessages = incomingMessages;
+        this.outgoingMessages = new ArrayList<>();
+    }
+
+    public List<Tuple2<Object, M>> getOutgoingMessages() {
+        return this.outgoingMessages;
+    }
+
+    @Override
+    public Iterator<M> receiveMessages() {
+        return this.incomingMessages.iterator();
+    }
+
+    @Override
+    public void sendMessage(final MessageScope messageScope, final M message) {
+        if (messageScope instanceof MessageScope.Local) {
+            final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
+            final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
+            final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
+            incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
+        } else {
+            ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
+        }
+    }
+
+    ///////////
+
+    private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
+        incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
+        return (T) incidentTraversal;
+    }
+
+    private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
+        final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
+        return step.getDirection().opposite();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
new file mode 100644
index 0000000..e2252fa
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
+
+    private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
+
+    @Override
+    public void emit(final OK key, final OV value) {
+        this.emissions.add(new Tuple2<>(key, value));
+    }
+
+    public Iterator<Tuple2<OK, OV>> getEmissions() {
+        final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
+        this.emissions = new ArrayList<>();
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
new file mode 100644
index 0000000..adb080b
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputFormatRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+        return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+                NullWritable.class,
+                VertexWritable.class)
+                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+                .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
new file mode 100644
index 0000000..19d79a8
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
+ * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface InputRDD {
+
+    /**
+     * Read the graphRDD from the underlying graph system.
+     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer}.
+     * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
+     * @return an adjacency list representation of the underlying graph system.
+     */
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
+}



[3/3] incubator-tinkerpop git commit: Spark is now isolated into spark-gremlin package. Tests are passing. A few hacks here and there just to get things building. However, it was pretty easy to split apart -- which is pomising.

Posted by ok...@apache.org.
Spark is now isolated into spark-gremlin package. Tests are passing. A few hacks here and there just to get things building. However, it was pretty easy to split apart -- which is pomising.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/04f5651e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/04f5651e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/04f5651e

Branch: refs/heads/hadoop_split
Commit: 04f5651e8d8e9108a4281d959b71ff161a52c208
Parents: 0977a25
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Sep 3 12:50:24 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Sep 3 12:50:24 2015 -0600

----------------------------------------------------------------------
 hadoop-gremlin/pom.xml                          |  62 -----
 .../groovy/plugin/HadoopGremlinPlugin.java      |   3 -
 .../process/computer/spark/RuleAccumulator.java |  55 ----
 .../process/computer/spark/SparkExecutor.java   | 222 -----------------
 .../computer/spark/SparkGraphComputer.java      | 216 ----------------
 .../process/computer/spark/SparkMapEmitter.java |  45 ----
 .../process/computer/spark/SparkMemory.java     | 181 --------------
 .../process/computer/spark/SparkMessenger.java  |  83 -------
 .../computer/spark/SparkReduceEmitter.java      |  45 ----
 .../computer/spark/io/InputFormatRDD.java       |  46 ----
 .../process/computer/spark/io/InputRDD.java     |  41 ---
 .../computer/spark/io/OutputFormatRDD.java      |  48 ----
 .../process/computer/spark/io/OutputRDD.java    |  31 ---
 .../computer/spark/payload/MessagePayload.java  |  35 ---
 .../process/computer/spark/payload/Payload.java |  27 --
 .../spark/payload/ViewIncomingPayload.java      |  95 -------
 .../spark/payload/ViewOutgoingPayload.java      |  46 ----
 .../computer/spark/payload/ViewPayload.java     |  39 ---
 .../gremlin/hadoop/structure/HadoopGraph.java   |  16 +-
 .../spark/HadoopSparkGraphProvider.java         |  37 ---
 .../SparkGraphComputerProcessIntegrateTest.java |  32 ---
 ...GraphComputerGroovyProcessIntegrateTest.java |  33 ---
 .../groovy/SparkHadoopGremlinPluginTest.java    |  33 ---
 .../computer/spark/io/ExampleInputRDD.java      |  47 ----
 .../computer/spark/io/ExampleOutputRDD.java     |  45 ----
 .../computer/spark/io/InputOutputRDDTest.java   |  59 -----
 .../process/computer/spark/io/InputRDDTest.java |  54 ----
 .../computer/spark/io/OutputRDDTest.java        |  62 -----
 pom.xml                                         |   1 +
 spark-gremlin/pom.xml                           | 249 +++++++++++++++++++
 .../spark/groovy/plugin/SparkGremlinPlugin.java |  26 ++
 .../spark/process/computer/RuleAccumulator.java |  55 ++++
 .../spark/process/computer/SparkExecutor.java   | 202 +++++++++++++++
 .../process/computer/SparkGraphComputer.java    | 217 ++++++++++++++++
 .../spark/process/computer/SparkMapEmitter.java |  45 ++++
 .../spark/process/computer/SparkMemory.java     | 181 ++++++++++++++
 .../spark/process/computer/SparkMessenger.java  |  83 +++++++
 .../process/computer/SparkReduceEmitter.java    |  45 ++++
 .../process/computer/io/InputFormatRDD.java     |  47 ++++
 .../spark/process/computer/io/InputRDD.java     |  41 +++
 .../process/computer/io/OutputFormatRDD.java    |  49 ++++
 .../spark/process/computer/io/OutputRDD.java    |  31 +++
 .../computer/payload/MessagePayload.java        |  35 +++
 .../spark/process/computer/payload/Payload.java |  27 ++
 .../computer/payload/ViewIncomingPayload.java   |  95 +++++++
 .../computer/payload/ViewOutgoingPayload.java   |  46 ++++
 .../process/computer/payload/ViewPayload.java   |  39 +++
 .../spark/process/HadoopGraphProvider.java      | 166 +++++++++++++
 .../computer/HadoopSparkGraphProvider.java      |  36 +++
 .../SparkGraphComputerProcessIntegrateTest.java |  32 +++
 ...GraphComputerGroovyProcessIntegrateTest.java |  33 +++
 .../groovy/SparkHadoopGremlinPluginTest.java    |  32 +++
 .../process/computer/io/ExampleInputRDD.java    |  47 ++++
 .../process/computer/io/ExampleOutputRDD.java   |  45 ++++
 .../process/computer/io/InputOutputRDDTest.java |  59 +++++
 .../spark/process/computer/io/InputRDDTest.java |  54 ++++
 .../process/computer/io/OutputRDDTest.java      |  62 +++++
 57 files changed, 2091 insertions(+), 1727 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index cb6a050..2466da0 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -108,68 +108,6 @@ limitations under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-        <!-- SPARK GRAPH COMPUTER -->
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.10</artifactId>
-            <version>1.2.1</version>
-            <exclusions>
-                <!-- self conflicts -->
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-databind</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-lang3</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-codec</groupId>
-                    <artifactId>commons-codec</artifactId>
-                </exclusion>
-                <!-- gremlin-core conflicts -->
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>jcl-over-slf4j</artifactId>
-                </exclusion>
-                <!-- gremlin-groovy conflicts -->
-                <exclusion>
-                    <groupId>jline</groupId>
-                    <artifactId>jline</artifactId>
-                </exclusion>
-                <!-- hadoop conflicts -->
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-client</artifactId>
-                </exclusion>
-                <!-- giraph conflicts -->
-                <exclusion>
-                    <groupId>io.netty</groupId>
-                    <artifactId>netty</artifactId>
-                </exclusion>
-                <!-- lgpl conflicts -->
-                <exclusion>
-                    <groupId>com.google.code.findbugs</groupId>
-                    <artifactId>findbugs</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <!-- consistent dependencies -->
         <dependency>
             <groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
index a34153e..18c4b32 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
@@ -30,7 +30,6 @@ import org.apache.tinkerpop.gremlin.groovy.plugin.RemoteAcceptor;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.mapreduce.MapReduceGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
@@ -71,7 +70,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
         add(IMPORT_SPACE + HDFSTools.class.getPackage().getName() + DOT_STAR);
         ////
         add(IMPORT_SPACE + GiraphGraphComputer.class.getPackage().getName() + DOT_STAR);
-        add(IMPORT_SPACE + SparkGraphComputer.class.getPackage().getName() + DOT_STAR);
         add(IMPORT_SPACE + MapReduceGraphComputer.class.getPackage().getName() + DOT_STAR);
     }};
 
@@ -92,7 +90,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", Job.class.getName()));
             ///
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", GiraphGraphComputer.class.getName()));
-            pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", SparkGraphComputer.class.getName()));
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", MapReduceGraphComputer.class.getName()));
             ///
             pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", HadoopGraph.class.getName()));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
deleted file mode 100644
index 422b676..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.spark.AccumulatorParam;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class RuleAccumulator implements AccumulatorParam<Rule> {
-
-    @Override
-    public Rule addAccumulator(final Rule a, final Rule b) {
-        if (a.getOperation().equals(Rule.Operation.NO_OP))
-            return b;
-        if (b.getOperation().equals(Rule.Operation.NO_OP))
-            return a;
-        else
-            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
-    }
-
-    @Override
-    public Rule addInPlace(final Rule a, final Rule b) {
-        if (a.getOperation().equals(Rule.Operation.NO_OP))
-            return b;
-        if (b.getOperation().equals(Rule.Operation.NO_OP))
-            return a;
-        else
-            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
-    }
-
-    @Override
-    public Rule zero(final Rule rule) {
-        return new Rule(Rule.Operation.NO_OP, null);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
deleted file mode 100644
index b7268b8..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import com.google.common.base.Optional;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.MessagePayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.Payload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewOutgoingPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkExecutor {
-
-    private static final String[] EMPTY_ARRAY = new String[0];
-
-    private SparkExecutor() {
-    }
-
-    ////////////////////
-    // VERTEX PROGRAM //
-    ////////////////////
-
-    public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(
-            final JavaPairRDD<Object, VertexWritable> graphRDD,
-            final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
-            final SparkMemory memory,
-            final Configuration apacheConfiguration) {
-
-        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
-                graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
-                graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages
-                // for each partition of vertices
-                .mapPartitionsToPair(partitionIterator -> {
-                    HadoopPools.initialize(apacheConfiguration);
-                    final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
-                    final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
-                    final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
-                    final SparkMessenger<M> messenger = new SparkMessenger<>();
-                    workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
-                    return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
-                        final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
-                        // drop any compute properties that are cached in memory
-                        if (elementComputeKeysArray.length > 0)
-                            vertex.dropVertexProperties(elementComputeKeysArray);
-                        final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
-                        final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
-                        final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
-                        previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
-                        ///
-                        messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
-                        workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
-                        ///
-                        final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
-                                Collections.emptyList() :
-                                IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
-                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
-                        if (!partitionIterator.hasNext())
-                            workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
-                        return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
-                    });
-                })).setName("viewOutgoingRDD");
-
-        // "message pass" by reducing on the vertex object id of the view and message payloads
-        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
-        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
-                .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
-                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
-                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))  // emit the outgoing message payloads one by one
-                .reduceByKey((a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
-                    if (a instanceof ViewIncomingPayload) {
-                        ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
-                        return a;
-                    } else if (b instanceof ViewIncomingPayload) {
-                        ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
-                        return b;
-                    } else {
-                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
-                        c.mergePayload(a, messageCombiner);
-                        c.mergePayload(b, messageCombiner);
-                        return c;
-                    }
-                })
-                .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
-                .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
-                .mapValues(payload -> payload instanceof ViewIncomingPayload ?
-                        (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
-                        new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
-
-        newViewIncomingRDD.setName("viewIncomingRDD")
-                .foreachPartition(partitionIterator -> {
-                    HadoopPools.initialize(apacheConfiguration);
-                }); // need to complete a task so its BSP and the memory for this iteration is updated
-        return newViewIncomingRDD;
-    }
-
-    /////////////////
-    // MAP REDUCE //
-    ////////////////
-
-    public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
-        return (null == viewIncomingRDD) ?   // there was no vertex program
-                graphRDD.mapValues(vertexWritable -> {
-                    vertexWritable.get().dropEdges();
-                    return vertexWritable;
-                }) :
-                graphRDD.leftOuterJoin(viewIncomingRDD)
-                        .mapValues(tuple -> {
-                            final StarGraph.StarVertex vertex = tuple._1().get();
-                            vertex.dropEdges();
-                            vertex.dropVertexProperties(elementComputeKeys);
-                            final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
-                            view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
-                            return tuple._1();
-                        });
-    }
-
-    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
-            HadoopPools.initialize(apacheConfiguration);
-            final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
-            workerMapReduce.workerStart(MapReduce.Stage.MAP);
-            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
-            return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
-                workerMapReduce.map(ComputerGraph.mapReduce(vertexWritable._2().get()), mapEmitter);
-                if (!partitionIterator.hasNext())
-                    workerMapReduce.workerEnd(MapReduce.Stage.MAP);
-                return mapEmitter.getEmissions();
-            });
-        });
-        if (mapReduce.getMapKeySort().isPresent())
-            mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
-        return mapRDD;
-    }
-
-    // TODO: public static executeCombine()  is this necessary?  YES --- we groupByKey in reduce() where we want to combine first.
-
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
-            HadoopPools.initialize(apacheConfiguration);
-            final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
-            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
-            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
-            return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
-                workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
-                if (!partitionIterator.hasNext())
-                    workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
-                return reduceEmitter.getEmissions();
-            });
-        });
-        if (mapReduce.getReduceKeySort().isPresent())
-            reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
-        return reduceRDD;
-    }
-
-    ///////////////////
-    // Input/Output //
-    //////////////////
-
-    public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION,null);
-        if (null != outputLocation) {
-            // map back to a Hadoop stream for output
-            mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
-                    ObjectWritable.class,
-                    ObjectWritable.class,
-                    SequenceFileOutputFormat.class, hadoopConfiguration);
-           // TODO: mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
-            try {
-            mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
deleted file mode 100644
index eb1e411..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.FileConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputFormatRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputFormatRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Stream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
-
-    public SparkGraphComputer(final HadoopGraph hadoopGraph) {
-        super(hadoopGraph);
-    }
-
-    @Override
-    public Future<ComputerResult> submit() {
-        super.validateStatePriorToExecution();
-        // apache and hadoop configurations that are used throughout the graph computer computation
-        final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
-        apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
-        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
-        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
-            try {
-                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-                apacheConfiguration.setProperty(Constants.MAPRED_INPUT_DIR, inputLocation);
-                hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, inputLocation);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-
-        // create the completable future
-        return CompletableFuture.<ComputerResult>supplyAsync(() -> {
-            final long startTime = System.currentTimeMillis();
-            SparkMemory memory = null;
-            // delete output location
-            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
-            if (null != outputLocation) {
-                try {
-                    FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
-                } catch (final IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
-            }
-            // wire up a spark context
-            final SparkConf sparkConfiguration = new SparkConf();
-            sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
-                    /*final List<Class> classes = new ArrayList<>();
-                    classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
-                    classes.addAll(IOClasses.getSharedHadoopClasses());
-                    classes.add(ViewPayload.class);
-                    classes.add(MessagePayload.class);
-                    classes.add(ViewIncomingPayload.class);
-                    classes.add(ViewOutgoingPayload.class);
-                    sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
-
-            // create the spark configuration from the graph computer configuration
-            hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-            // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
-            try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
-                // add the project jars to the cluster
-                this.loadJars(sparkContext, hadoopConfiguration);
-                // create a message-passing friendly rdd from the input rdd
-                final JavaPairRDD<Object, VertexWritable> graphRDD;
-                try {
-                    graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
-                            .newInstance()
-                            .readGraphRDD(apacheConfiguration, sparkContext)
-                            .setName("graphRDD")
-                            .cache();
-                } catch (final InstantiationException | IllegalAccessException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
-                }
-                JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
-
-                ////////////////////////////////
-                // process the vertex program //
-                ////////////////////////////////
-                if (null != this.vertexProgram) {
-                    // set up the vertex program and wire up configurations
-                    memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
-                    this.vertexProgram.setup(memory);
-                    memory.broadcastMemory(sparkContext);
-                    final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
-                    this.vertexProgram.storeState(vertexProgramConfiguration);
-                    ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
-                    ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
-
-                    // execute the vertex program
-                    while (true) {
-                        memory.setInTask(true);
-                        viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
-                        memory.setInTask(false);
-                        if (this.vertexProgram.terminate(memory))
-                            break;
-                        else {
-                            memory.incrIteration();
-                            memory.broadcastMemory(sparkContext);
-                        }
-                    }
-                    // write the graph rdd using the output rdd
-                    if (!this.persist.equals(Persist.NOTHING)) {
-                        try {
-                            hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
-                                    .newInstance()
-                                    .writeGraphRDD(apacheConfiguration, graphRDD);
-                        } catch (final InstantiationException | IllegalAccessException e) {
-                            throw new IllegalStateException(e.getMessage(), e);
-                        }
-                    }
-                }
-
-                final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
-
-                //////////////////////////////
-                // process the map reducers //
-                //////////////////////////////
-                if (!this.mapReducers.isEmpty()) {
-                    final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
-                    final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
-                    for (final MapReduce mapReduce : this.mapReducers) {
-                        // execute the map reduce job
-                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
-                        mapReduce.storeState(newApacheConfiguration);
-                        // map
-                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
-                        // combine TODO: is this really needed
-                        // reduce
-                        final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
-                        // write the map reduce output back to disk (memory)
-                        SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
-                    }
-                }
-                // update runtime and return the newly computed graph
-                finalMemory.setRuntime(System.currentTimeMillis() - startTime);
-                return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
-            }
-        });
-    }
-
-    /////////////////
-
-    private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
-        if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
-            final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
-            if (null == hadoopGremlinLocalLibs)
-                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
-            else {
-                final String[] paths = hadoopGremlinLocalLibs.split(":");
-                for (final String path : paths) {
-                    final File file = new File(path);
-                    if (file.exists())
-                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
-                    else
-                        this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
-                }
-            }
-        }
-    }
-
-    public static void main(final String[] args) throws Exception {
-        final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
-        new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
deleted file mode 100644
index 7141259..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
-
-    private List<Tuple2<K, V>> emissions = new ArrayList<>();
-
-    @Override
-    public void emit(final K key, final V value) {
-        this.emissions.add(new Tuple2<>(key, value));
-    }
-
-    public Iterator<Tuple2<K, V>> getEmissions() {
-        final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
-        this.emissions = new ArrayList<>();
-        return iterator;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
deleted file mode 100644
index e2de405..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMemory implements Memory.Admin, Serializable {
-
-    public final Set<String> memoryKeys = new HashSet<>();
-    private final AtomicInteger iteration = new AtomicInteger(0);   // do these need to be atomics?
-    private final AtomicLong runtime = new AtomicLong(0l);
-    private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
-    private Broadcast<Map<String, Object>> broadcast;
-    private boolean inTask = false;
-
-    public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
-        if (null != vertexProgram) {
-            for (final String key : vertexProgram.getMemoryComputeKeys()) {
-                MemoryHelper.validateKey(key);
-                this.memoryKeys.add(key);
-            }
-        }
-        for (final MapReduce mapReduce : mapReducers) {
-            this.memoryKeys.add(mapReduce.getMemoryKey());
-        }
-        for (final String key : this.memoryKeys) {
-            this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), key, new RuleAccumulator()));
-        }
-        this.broadcast = sparkContext.broadcast(new HashMap<>());
-    }
-
-    @Override
-    public Set<String> keys() {
-        if (this.inTask)
-            return this.broadcast.getValue().keySet();
-        else {
-            final Set<String> trueKeys = new HashSet<>();
-            this.memory.forEach((key, value) -> {
-                if (value.value().getObject() != null)
-                    trueKeys.add(key);
-            });
-            return Collections.unmodifiableSet(trueKeys);
-        }
-    }
-
-    @Override
-    public void incrIteration() {
-        this.iteration.getAndIncrement();
-    }
-
-    @Override
-    public void setIteration(final int iteration) {
-        this.iteration.set(iteration);
-    }
-
-    @Override
-    public int getIteration() {
-        return this.iteration.get();
-    }
-
-    @Override
-    public void setRuntime(final long runTime) {
-        this.runtime.set(runTime);
-    }
-
-    @Override
-    public long getRuntime() {
-        return this.runtime.get();
-    }
-
-    @Override
-    public <R> R get(final String key) throws IllegalArgumentException {
-        final R r = this.getValue(key);
-        if (null == r)
-            throw Memory.Exceptions.memoryDoesNotExist(key);
-        else
-            return r;
-    }
-
-    @Override
-    public void incr(final String key, final long delta) {
-        checkKeyValue(key, delta);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.INCR, this.<Long>getValue(key) + delta));
-    }
-
-    @Override
-    public void and(final String key, final boolean bool) {
-        checkKeyValue(key, bool);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.AND, this.<Boolean>getValue(key) && bool));
-    }
-
-    @Override
-    public void or(final String key, final boolean bool) {
-        checkKeyValue(key, bool);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.OR, this.<Boolean>getValue(key) || bool));
-    }
-
-    @Override
-    public void set(final String key, final Object value) {
-        checkKeyValue(key, value);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.SET, value));
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.memoryString(this);
-    }
-
-    protected void setInTask(final boolean inTask) {
-        this.inTask = inTask;
-    }
-
-    protected void broadcastMemory(final JavaSparkContext sparkContext) {
-        this.broadcast.destroy(true); // do we need to block?
-        final Map<String, Object> toBroadcast = new HashMap<>();
-        this.memory.forEach((key, rule) -> {
-            if (null != rule.value().getObject())
-                toBroadcast.put(key, rule.value().getObject());
-        });
-        this.broadcast = sparkContext.broadcast(toBroadcast);
-    }
-
-    private void checkKeyValue(final String key, final Object value) {
-        if (!this.memoryKeys.contains(key))
-            throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
-        MemoryHelper.validateValue(value);
-    }
-
-    private <R> R getValue(final String key) {
-        return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().getObject();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
deleted file mode 100644
index f52843b..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMessenger<M> implements Messenger<M> {
-
-    private Vertex vertex;
-    private Iterable<M> incomingMessages;
-    private List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
-
-    public void setVertexAndIncomingMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
-        this.vertex = vertex;
-        this.incomingMessages = incomingMessages;
-        this.outgoingMessages = new ArrayList<>();
-    }
-
-    public List<Tuple2<Object, M>> getOutgoingMessages() {
-        return this.outgoingMessages;
-    }
-
-    @Override
-    public Iterator<M> receiveMessages() {
-        return this.incomingMessages.iterator();
-    }
-
-    @Override
-    public void sendMessage(final MessageScope messageScope, final M message) {
-        if (messageScope instanceof MessageScope.Local) {
-            final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
-            final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
-            final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
-            incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
-        } else {
-            ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
-        }
-    }
-
-    ///////////
-
-    private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
-        incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
-        return (T) incidentTraversal;
-    }
-
-    private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
-        final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
-        return step.getDirection().opposite();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
deleted file mode 100644
index a5d0175..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
-
-    private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
-
-    @Override
-    public void emit(final OK key, final OV value) {
-        this.emissions.add(new Tuple2<>(key, value));
-    }
-
-    public Iterator<Tuple2<OK, OV>> getEmissions() {
-        final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
-        this.emissions = new ArrayList<>();
-        return iterator;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java
deleted file mode 100644
index 082c3b6..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class InputFormatRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
-        return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
-                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
-                NullWritable.class,
-                VertexWritable.class)
-                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
-                .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java
deleted file mode 100644
index 75f602b..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
- * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface InputRDD {
-
-    /**
-     * Read the graphRDD from the underlying graph system.
-     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer}.
-     * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
-     * @return an adjacency list representation of the underlying graph system.
-     */
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
deleted file mode 100644
index 8d64fd2..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class OutputFormatRDD implements OutputRDD {
-
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-        if (null != outputLocation) {
-            // map back to a <nullwritable,vertexwritable> stream for output
-            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
-                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
-                            NullWritable.class,
-                            VertexWritable.class,
-                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
deleted file mode 100644
index 8f6ef7a..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface OutputRDD {
-
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
deleted file mode 100644
index 191c9ca..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class MessagePayload<M> implements Payload {
-
-    private final M message;
-
-    public MessagePayload(final M message) {
-        this.message = message;
-    }
-
-    public M getMessage() {
-        return this.message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
deleted file mode 100644
index d901df7..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import java.io.Serializable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface Payload extends Serializable {
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
deleted file mode 100644
index b236fe9..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ViewIncomingPayload<M> implements Payload {
-
-    private List<DetachedVertexProperty<Object>> view = null;
-    private final List<M> incomingMessages;
-
-
-    public ViewIncomingPayload() {
-        this.incomingMessages = null;
-    }
-
-    public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
-        this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
-    }
-
-    public ViewIncomingPayload(final ViewPayload viewPayload) {
-        this.incomingMessages = null;
-        this.view = viewPayload.getView();
-    }
-
-
-    public List<DetachedVertexProperty<Object>> getView() {
-        return null == this.view ? Collections.emptyList() : this.view;
-    }
-
-
-    public List<M> getIncomingMessages() {
-        return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
-    }
-
-    public boolean hasView() {
-        return null != view;
-    }
-
-    ////////////////////
-
-
-    private void mergeMessage(final M message, final MessageCombiner<M> messageCombiner) {
-        if (this.incomingMessages.isEmpty() || null == messageCombiner)
-            this.incomingMessages.add(message);
-        else
-            this.incomingMessages.set(0, messageCombiner.combine(this.incomingMessages.get(0), message));
-    }
-
-    private void mergeViewIncomingPayload(final ViewIncomingPayload<M> viewIncomingPayload, final MessageCombiner<M> messageCombiner) {
-        if (this.view == null)
-            this.view = viewIncomingPayload.view;
-        else
-            this.view.addAll(viewIncomingPayload.getView());
-
-        for (final M message : viewIncomingPayload.getIncomingMessages()) {
-            this.mergeMessage(message, messageCombiner);
-        }
-    }
-
-    public void mergePayload(final Payload payload, final MessageCombiner<M> messageCombiner) {
-        if (payload instanceof ViewPayload)
-            this.view = ((ViewPayload) payload).getView();
-        else if (payload instanceof MessagePayload)
-            this.mergeMessage(((MessagePayload<M>) payload).getMessage(), messageCombiner);
-        else if (payload instanceof ViewIncomingPayload)
-            this.mergeViewIncomingPayload((ViewIncomingPayload<M>) payload, messageCombiner);
-        else
-            throw new IllegalArgumentException("The provided payload is an unsupported merge payload: " + payload);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
deleted file mode 100644
index c4daf47..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-import scala.Tuple2;
-
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ViewOutgoingPayload<M> implements Payload {
-
-    private final List<DetachedVertexProperty<Object>> view;
-    private final List<Tuple2<Object,M>> outgoingMessages;
-
-    public ViewOutgoingPayload(final List<DetachedVertexProperty<Object>> view, final List<Tuple2<Object,M>> outgoingMessages) {
-        this.view = view;
-        this.outgoingMessages = outgoingMessages;
-    }
-
-    public ViewPayload getView() {
-        return new ViewPayload(this.view);
-    }
-
-    public List<Tuple2<Object,M>> getOutgoingMessages() {
-        return this.outgoingMessages;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
deleted file mode 100644
index 0ec5ef5..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ViewPayload implements Payload {
-
-    private final List<DetachedVertexProperty<Object>> view;
-
-    public ViewPayload(final List<DetachedVertexProperty<Object>> view) {
-        this.view = view;
-    }
-
-    public List<DetachedVertexProperty<Object>> getView() {
-        return this.view;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 18d515e..3c3c9b7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopEdgeIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopVertexIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
@@ -192,10 +191,17 @@ public final class HadoopGraph implements Graph {
     public <C extends GraphComputer> C compute(final Class<C> graphComputerClass) {
         if (graphComputerClass.equals(GiraphGraphComputer.class))
             return (C) new GiraphGraphComputer(this);
-        else if (graphComputerClass.equals(SparkGraphComputer.class))
-            return (C) new SparkGraphComputer(this);
-        else
-            throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass);
+        else {
+            try {
+                return graphComputerClass.getConstructor(HadoopGraph.class).newInstance(this);
+            } catch (final Exception e) {
+                throw new IllegalArgumentException(e.getMessage(), e);
+            }
+        }
+        //else if (graphComputerClass.equals(SparkGraphComputer.class))
+        //    return (C) new SparkGraphComputer(this);
+        //else
+        //   throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java
deleted file mode 100644
index af0d745..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.GraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@GraphProvider.Descriptor(computer = SparkGraphComputer.class)
-public final class HadoopSparkGraphProvider extends HadoopGraphProvider {
-
-    public GraphTraversalSource traversal(final Graph graph) {
-        return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
deleted file mode 100644
index ae729fd..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(ProcessComputerSuite.class)
-@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
-public class SparkGraphComputerProcessIntegrateTest {
-}