You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/09 22:20:32 UTC

[02/18] incubator-tinkerpop git commit: Spark is now isolated into spark-gremlin package. Tests are passing. A few hacks here and there just to get things building. However, it was pretty easy to split apart -- which is pomising.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
deleted file mode 100644
index cc07bac..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.groovy;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.HadoopSparkGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(GroovyProcessComputerSuite.class)
-@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
-public class SparkGraphComputerGroovyProcessIntegrateTest {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java
deleted file mode 100644
index 3d83142..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.groovy;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopPluginSuite;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.HadoopSparkGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(HadoopPluginSuite.class)
-@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
-public class SparkHadoopGremlinPluginTest {
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java
deleted file mode 100644
index d456808..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.T;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleInputRDD implements InputRDD {
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
-        final List<Vertex> list = new ArrayList<>();
-        list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
-        list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
-        list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
-        list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
-        return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java
deleted file mode 100644
index 45ed114..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleOutputRDD implements OutputRDD {
-    @Override
-    public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
-        int totalAge = 0;
-        final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
-        while (iterator.hasNext()) {
-            final Vertex vertex = iterator.next().get();
-            if (vertex.label().equals("person"))
-                totalAge = totalAge + vertex.<Integer>value("age");
-        }
-        assertEquals(123, totalAge);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java
deleted file mode 100644
index ea1cb87..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputOutputRDDTest {
-
-    @Test
-    public void shouldReadFromWriteToArbitraryRDD() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
deleted file mode 100644
index c600327..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputRDDTest {
-
-    @Test
-    public void shouldReadFromArbitraryRDD() {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
-        assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java
deleted file mode 100644
index deea6ab..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class OutputRDDTest {
-
-    @Test
-    public void shouldWriteToArbitraryRDD() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, HadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        ////////
-        Graph graph = GraphFactory.open(configuration);
-        graph.compute(SparkGraphComputer.class)
-                .result(GraphComputer.ResultGraph.NEW)
-                .persist(GraphComputer.Persist.EDGES)
-                .program(TraversalVertexProgram.build()
-                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
-                                "gremlin-groovy",
-                                "g.V()").create(graph)).submit().get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 769f84f..7a4411c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,7 @@ limitations under the License.
         <module>gremlin-driver</module>
         <module>gremlin-console</module>
         <module>gremlin-server</module>
+        <module>spark-gremlin</module>
     </modules>
     <scm>
         <connection>scm:git:git@git-wip-us.apache.org:repos/asf/incubator-tinkerpop.git</connection>

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
new file mode 100644
index 0000000..277ce11
--- /dev/null
+++ b/spark-gremlin/pom.xml
@@ -0,0 +1,249 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.tinkerpop</groupId>
+        <artifactId>tinkerpop</artifactId>
+        <version>3.1.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>spark-gremlin</artifactId>
+    <name>Apache TinkerPop :: Spark Gremlin</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-groovy</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>hadoop-gremlin</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- SPARK GRAPH COMPUTER -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <version>1.2.1</version>
+            <exclusions>
+                <!-- self conflicts -->
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-lang3</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-codec</groupId>
+                    <artifactId>commons-codec</artifactId>
+                </exclusion>
+                <!-- gremlin-core conflicts -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jcl-over-slf4j</artifactId>
+                </exclusion>
+                <!-- gremlin-groovy conflicts -->
+                <exclusion>
+                    <groupId>jline</groupId>
+                    <artifactId>jline</artifactId>
+                </exclusion>
+                <!-- hadoop conflicts -->
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+                <!-- giraph conflicts -->
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <!-- lgpl conflicts -->
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>findbugs</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- consistent dependencies -->
+<!--        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.10.3</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+            <version>3.5.13.Final</version>
+        </dependency>
+-->
+        <!-- TEST -->
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-test</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin-groovy-test</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>tinkergraph-gremlin</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <repositories>
+        <repository>
+            <id>hyracks-releases</id>
+            <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public-releases/</url>
+        </repository>
+    </repositories>
+    <build>
+        <directory>${basedir}/target</directory>
+        <finalName>${project.artifactId}-${project.version}</finalName>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>${basedir}/src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>build-detached-assemblies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <attach>false</attach>
+                            <descriptors>
+                                <descriptor>src/assembly/standalone.xml</descriptor>
+                                <descriptor>src/assembly/hadoop-job.xml</descriptor>
+                            </descriptors>
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.gmavenplus</groupId>
+                <artifactId>gmavenplus-plugin</artifactId>
+                <version>1.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>addSources</goal>
+                            <goal>addTestSources</goal>
+                            <goal>generateStubs</goal>
+                            <goal>compile</goal>
+                            <goal>testGenerateStubs</goal>
+                            <goal>testCompile</goal>
+                            <goal>removeStubs</goal>
+                            <goal>removeTestStubs</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <invokeDynamic>true</invokeDynamic>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.5</version>
+                <configuration>
+                    <archive>
+                        <manifestEntries>
+                            <Gremlin-Plugin-Dependencies>org.apache.hadoop:hadoop-core:1.2.1
+                            </Gremlin-Plugin-Dependencies>
+                            <!-- deletes the servlet-api jar from the path after install - causes conflicts -->
+                            <Gremlin-Plugin-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+                            </Gremlin-Plugin-Paths>
+                            <Gremlin-Lib-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+                            </Gremlin-Lib-Paths>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <!-- log4j.configuration>log4j-core.properties</log4j.configuration> -->
+                    <!--<argLine>-Xmx2048M</argLine>-->
+                    <excludes>
+                        <exclude>**/*IntegrateTest.java</exclude>
+                        <exclude>**/*PerformanceTest.java</exclude>
+                        <!-- this is technically a member of the integration test suite -->
+                        <exclude>**/HadoopGremlinPluginTest.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
new file mode 100644
index 0000000..0b04300
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.groovy.plugin;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkGremlinPlugin {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
new file mode 100644
index 0000000..bebd283
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.spark.AccumulatorParam;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RuleAccumulator implements AccumulatorParam<Rule> {
+
+    @Override
+    public Rule addAccumulator(final Rule a, final Rule b) {
+        if (a.getOperation().equals(Rule.Operation.NO_OP))
+            return b;
+        if (b.getOperation().equals(Rule.Operation.NO_OP))
+            return a;
+        else
+            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
+    }
+
+    @Override
+    public Rule addInPlace(final Rule a, final Rule b) {
+        if (a.getOperation().equals(Rule.Operation.NO_OP))
+            return b;
+        if (b.getOperation().equals(Rule.Operation.NO_OP))
+            return a;
+        else
+            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
+    }
+
+    @Override
+    public Rule zero(final Rule rule) {
+        return new Rule(Rule.Operation.NO_OP, null);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
new file mode 100644
index 0000000..288538f
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -0,0 +1,202 @@
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import com.google.common.base.Optional;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkExecutor {
+
+    private static final String[] EMPTY_ARRAY = new String[0];
+
+    private SparkExecutor() {
+    }
+
+    ////////////////////
+    // VERTEX PROGRAM //
+    ////////////////////
+
+    public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(
+            final JavaPairRDD<Object, VertexWritable> graphRDD,
+            final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
+            final SparkMemory memory,
+            final Configuration apacheConfiguration) {
+
+        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
+                graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
+                graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages
+                // for each partition of vertices
+                .mapPartitionsToPair(partitionIterator -> {
+                    HadoopPools.initialize(apacheConfiguration);
+                    final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
+                    final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
+                    final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+                    final SparkMessenger<M> messenger = new SparkMessenger<>();
+                    workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
+                    return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
+                        final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
+                        // drop any compute properties that are cached in memory
+                        if (elementComputeKeysArray.length > 0)
+                            vertex.dropVertexProperties(elementComputeKeysArray);
+                        final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
+                        final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
+                        final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
+                        previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
+                        ///
+                        messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
+                        workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
+                        ///
+                        final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
+                                Collections.emptyList() :
+                                IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
+                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+                        if (!partitionIterator.hasNext())
+                            workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
+                        return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+                    });
+                })).setName("viewOutgoingRDD");
+
+        // "message pass" by reducing on the vertex object id of the view and message payloads
+        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
+        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
+                .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
+                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))  // emit the outgoing message payloads one by one
+                .reduceByKey((a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
+                    if (a instanceof ViewIncomingPayload) {
+                        ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
+                        return a;
+                    } else if (b instanceof ViewIncomingPayload) {
+                        ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
+                        return b;
+                    } else {
+                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
+                        c.mergePayload(a, messageCombiner);
+                        c.mergePayload(b, messageCombiner);
+                        return c;
+                    }
+                })
+                .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
+                .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
+                .mapValues(payload -> payload instanceof ViewIncomingPayload ?
+                        (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
+                        new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
+
+        newViewIncomingRDD.setName("viewIncomingRDD")
+                .foreachPartition(partitionIterator -> {
+                    HadoopPools.initialize(apacheConfiguration);
+                }); // need to complete a task so its BSP and the memory for this iteration is updated
+        return newViewIncomingRDD;
+    }
+
+    /////////////////
+    // MAP REDUCE //
+    ////////////////
+
+    public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
+        return (null == viewIncomingRDD) ?   // there was no vertex program
+                graphRDD.mapValues(vertexWritable -> {
+                    vertexWritable.get().dropEdges();
+                    return vertexWritable;
+                }) :
+                graphRDD.leftOuterJoin(viewIncomingRDD)
+                        .mapValues(tuple -> {
+                            final StarGraph.StarVertex vertex = tuple._1().get();
+                            vertex.dropEdges();
+                            vertex.dropVertexProperties(elementComputeKeys);
+                            final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
+                            view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
+                            return tuple._1();
+                        });
+    }
+
+    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
+            HadoopPools.initialize(apacheConfiguration);
+            final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+            workerMapReduce.workerStart(MapReduce.Stage.MAP);
+            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+            return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
+                workerMapReduce.map(ComputerGraph.mapReduce(vertexWritable._2().get()), mapEmitter);
+                if (!partitionIterator.hasNext())
+                    workerMapReduce.workerEnd(MapReduce.Stage.MAP);
+                return mapEmitter.getEmissions();
+            });
+        });
+        if (mapReduce.getMapKeySort().isPresent())
+            mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
+        return mapRDD;
+    }
+
+    // TODO: public static executeCombine()  is this necessary?  YES --- we groupByKey in reduce() where we want to combine first.
+
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
+            HadoopPools.initialize(apacheConfiguration);
+            final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
+            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+            return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
+                workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
+                if (!partitionIterator.hasNext())
+                    workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
+                return reduceEmitter.getEmissions();
+            });
+        });
+        if (mapReduce.getReduceKeySort().isPresent())
+            reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
+        return reduceRDD;
+    }
+
+    ///////////////////
+    // Input/Output //
+    //////////////////
+
+    public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+        if (null != outputLocation) {
+            // map back to a Hadoop stream for output
+            mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+                    ObjectWritable.class,
+                    ObjectWritable.class,
+                    SequenceFileOutputFormat.class, hadoopConfiguration);
+            // TODO: mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
+            try {
+                mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
new file mode 100644
index 0000000..3aa4383
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.FileConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
+
+    public SparkGraphComputer(final HadoopGraph hadoopGraph) {
+        super(hadoopGraph);
+    }
+
+    @Override
+    public Future<ComputerResult> submit() {
+        super.validateStatePriorToExecution();
+        // apache and hadoop configurations that are used throughout the graph computer computation
+        final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
+        apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
+        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
+        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+            try {
+                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+                apacheConfiguration.setProperty(Constants.MAPRED_INPUT_DIR, inputLocation);
+                hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, inputLocation);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        // create the completable future
+        return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+            final long startTime = System.currentTimeMillis();
+            SparkMemory memory = null;
+            // delete output location
+            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+            if (null != outputLocation) {
+                try {
+                    FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
+                } catch (final IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+            }
+            // wire up a spark context
+            final SparkConf sparkConfiguration = new SparkConf();
+            sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
+                    /*final List<Class> classes = new ArrayList<>();
+                    classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
+                    classes.addAll(IOClasses.getSharedHadoopClasses());
+                    classes.add(ViewPayload.class);
+                    classes.add(MessagePayload.class);
+                    classes.add(ViewIncomingPayload.class);
+                    classes.add(ViewOutgoingPayload.class);
+                    sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
+
+            // create the spark configuration from the graph computer configuration
+            hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+            // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
+            try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+                // add the project jars to the cluster
+                this.loadJars(sparkContext, hadoopConfiguration);
+                // create a message-passing friendly rdd from the input rdd
+                final JavaPairRDD<Object, VertexWritable> graphRDD;
+                try {
+                    graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+                            .newInstance()
+                            .readGraphRDD(apacheConfiguration, sparkContext)
+                            .setName("graphRDD")
+                            .cache();
+                } catch (final InstantiationException | IllegalAccessException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+                JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
+
+                ////////////////////////////////
+                // process the vertex program //
+                ////////////////////////////////
+                if (null != this.vertexProgram) {
+                    // set up the vertex program and wire up configurations
+                    memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
+                    this.vertexProgram.setup(memory);
+                    memory.broadcastMemory(sparkContext);
+                    final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
+                    this.vertexProgram.storeState(vertexProgramConfiguration);
+                    ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+                    ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
+
+                    // execute the vertex program
+                    while (true) {
+                        memory.setInTask(true);
+                        viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+                        memory.setInTask(false);
+                        if (this.vertexProgram.terminate(memory))
+                            break;
+                        else {
+                            memory.incrIteration();
+                            memory.broadcastMemory(sparkContext);
+                        }
+                    }
+                    // write the graph rdd using the output rdd
+                    if (!this.persist.equals(GraphComputer.Persist.NOTHING)) {
+                        try {
+                            hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
+                                    .newInstance()
+                                    .writeGraphRDD(apacheConfiguration, graphRDD);
+                        } catch (final InstantiationException | IllegalAccessException e) {
+                            throw new IllegalStateException(e.getMessage(), e);
+                        }
+                    }
+                }
+
+                final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
+
+                //////////////////////////////
+                // process the map reducers //
+                //////////////////////////////
+                if (!this.mapReducers.isEmpty()) {
+                    final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
+                    final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
+                    for (final MapReduce mapReduce : this.mapReducers) {
+                        // execute the map reduce job
+                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
+                        mapReduce.storeState(newApacheConfiguration);
+                        // map
+                        final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
+                        // combine TODO: is this really needed
+                        // reduce
+                        final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
+                        // write the map reduce output back to disk (memory)
+                        SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
+                    }
+                }
+                // update runtime and return the newly computed graph
+                finalMemory.setRuntime(System.currentTimeMillis() - startTime);
+                return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
+            }
+        });
+    }
+
+    /////////////////
+
+    private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
+        if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
+            final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
+            if (null == hadoopGremlinLocalLibs)
+                this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
+            else {
+                final String[] paths = hadoopGremlinLocalLibs.split(":");
+                for (final String path : paths) {
+                    final File file = new File(path);
+                    if (file.exists())
+                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
+                    else
+                        this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
+                }
+            }
+        }
+    }
+
+    public static void main(final String[] args) throws Exception {
+        final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+        new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
new file mode 100644
index 0000000..cf31249
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
+
+    private List<Tuple2<K, V>> emissions = new ArrayList<>();
+
+    @Override
+    public void emit(final K key, final V value) {
+        this.emissions.add(new Tuple2<>(key, value));
+    }
+
+    public Iterator<Tuple2<K, V>> getEmissions() {
+        final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
+        this.emissions = new ArrayList<>();
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
new file mode 100644
index 0000000..dbfadcc
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkMemory implements Memory.Admin, Serializable {
+
+    public final Set<String> memoryKeys = new HashSet<>();
+    private final AtomicInteger iteration = new AtomicInteger(0);   // do these need to be atomics?
+    private final AtomicLong runtime = new AtomicLong(0l);
+    private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
+    private Broadcast<Map<String, Object>> broadcast;
+    private boolean inTask = false;
+
+    public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
+        if (null != vertexProgram) {
+            for (final String key : vertexProgram.getMemoryComputeKeys()) {
+                MemoryHelper.validateKey(key);
+                this.memoryKeys.add(key);
+            }
+        }
+        for (final MapReduce mapReduce : mapReducers) {
+            this.memoryKeys.add(mapReduce.getMemoryKey());
+        }
+        for (final String key : this.memoryKeys) {
+            this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), key, new RuleAccumulator()));
+        }
+        this.broadcast = sparkContext.broadcast(new HashMap<>());
+    }
+
+    @Override
+    public Set<String> keys() {
+        if (this.inTask)
+            return this.broadcast.getValue().keySet();
+        else {
+            final Set<String> trueKeys = new HashSet<>();
+            this.memory.forEach((key, value) -> {
+                if (value.value().getObject() != null)
+                    trueKeys.add(key);
+            });
+            return Collections.unmodifiableSet(trueKeys);
+        }
+    }
+
+    @Override
+    public void incrIteration() {
+        this.iteration.getAndIncrement();
+    }
+
+    @Override
+    public void setIteration(final int iteration) {
+        this.iteration.set(iteration);
+    }
+
+    @Override
+    public int getIteration() {
+        return this.iteration.get();
+    }
+
+    @Override
+    public void setRuntime(final long runTime) {
+        this.runtime.set(runTime);
+    }
+
+    @Override
+    public long getRuntime() {
+        return this.runtime.get();
+    }
+
+    @Override
+    public <R> R get(final String key) throws IllegalArgumentException {
+        final R r = this.getValue(key);
+        if (null == r)
+            throw Memory.Exceptions.memoryDoesNotExist(key);
+        else
+            return r;
+    }
+
+    @Override
+    public void incr(final String key, final long delta) {
+        checkKeyValue(key, delta);
+        if (this.inTask)
+            this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
+        else
+            this.memory.get(key).setValue(new Rule(Rule.Operation.INCR, this.<Long>getValue(key) + delta));
+    }
+
+    @Override
+    public void and(final String key, final boolean bool) {
+        checkKeyValue(key, bool);
+        if (this.inTask)
+            this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
+        else
+            this.memory.get(key).setValue(new Rule(Rule.Operation.AND, this.<Boolean>getValue(key) && bool));
+    }
+
+    @Override
+    public void or(final String key, final boolean bool) {
+        checkKeyValue(key, bool);
+        if (this.inTask)
+            this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
+        else
+            this.memory.get(key).setValue(new Rule(Rule.Operation.OR, this.<Boolean>getValue(key) || bool));
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        checkKeyValue(key, value);
+        if (this.inTask)
+            this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
+        else
+            this.memory.get(key).setValue(new Rule(Rule.Operation.SET, value));
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.memoryString(this);
+    }
+
+    protected void setInTask(final boolean inTask) {
+        this.inTask = inTask;
+    }
+
+    protected void broadcastMemory(final JavaSparkContext sparkContext) {
+        this.broadcast.destroy(true); // do we need to block?
+        final Map<String, Object> toBroadcast = new HashMap<>();
+        this.memory.forEach((key, rule) -> {
+            if (null != rule.value().getObject())
+                toBroadcast.put(key, rule.value().getObject());
+        });
+        this.broadcast = sparkContext.broadcast(toBroadcast);
+    }
+
+    private void checkKeyValue(final String key, final Object value) {
+        if (!this.memoryKeys.contains(key))
+            throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
+        MemoryHelper.validateValue(value);
+    }
+
+    private <R> R getValue(final String key) {
+        return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().getObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
new file mode 100644
index 0000000..f32c684
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkMessenger<M> implements Messenger<M> {
+
+    private Vertex vertex;
+    private Iterable<M> incomingMessages;
+    private List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
+
+    public void setVertexAndIncomingMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
+        this.vertex = vertex;
+        this.incomingMessages = incomingMessages;
+        this.outgoingMessages = new ArrayList<>();
+    }
+
+    public List<Tuple2<Object, M>> getOutgoingMessages() {
+        return this.outgoingMessages;
+    }
+
+    @Override
+    public Iterator<M> receiveMessages() {
+        return this.incomingMessages.iterator();
+    }
+
+    @Override
+    public void sendMessage(final MessageScope messageScope, final M message) {
+        if (messageScope instanceof MessageScope.Local) {
+            final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
+            final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
+            final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
+            incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
+        } else {
+            ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
+        }
+    }
+
+    ///////////
+
+    private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
+        incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
+        return (T) incidentTraversal;
+    }
+
+    private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
+        final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
+        return step.getDirection().opposite();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
new file mode 100644
index 0000000..e2252fa
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
+
+    private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
+
+    @Override
+    public void emit(final OK key, final OV value) {
+        this.emissions.add(new Tuple2<>(key, value));
+    }
+
+    public Iterator<Tuple2<OK, OV>> getEmissions() {
+        final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
+        this.emissions = new ArrayList<>();
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
new file mode 100644
index 0000000..adb080b
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputFormatRDD implements InputRDD {
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+        final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+        return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+                NullWritable.class,
+                VertexWritable.class)
+                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+                .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
new file mode 100644
index 0000000..19d79a8
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
+ * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface InputRDD {
+
+    /**
+     * Read the graphRDD from the underlying graph system.
+     * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer}.
+     * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
+     * @return an adjacency list representation of the underlying graph system.
+     */
+    public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
+}