You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/03 21:50:01 UTC

incubator-tinkerpop git commit: SparkGraphComputer implemented. There is still lots of cleanup and some optimizations that can be added, but the semantics are correct and 90 percent of the test cases are passing. Having some weird serialization issues on

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/spark 96ffd77cc -> 8246ee6d5


SparkGraphComputer implemented. There is still lots of cleanup and some optimizations that can be added, but the semantics are correct and 90 percent of the test cases are passing. Having some weird serialization issues on some random tests---don't get why.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8246ee6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8246ee6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8246ee6d

Branch: refs/heads/spark
Commit: 8246ee6d52c1520fb66796ec04c6e969d6d72d3d
Parents: 96ffd77
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 3 13:50:08 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 3 13:50:08 2015 -0700

----------------------------------------------------------------------
 .../process/computer/util/DefaultMemory.java    | 109 +++++++++++
 .../structure/util/detached/DetachedVertex.java |   2 +-
 hadoop-gremlin/conf/spark-gryo.properties       |  42 +++++
 hadoop-gremlin/conf/spark-kryo.properties       |  38 ----
 .../tinkerpop/gremlin/hadoop/Constants.java     |   1 +
 .../computer/example/TraversalSupplier1.java    |   7 +-
 .../process/computer/spark/RuleAccumulator.java |  45 +++++
 .../spark/SerializableConfiguration.java        |  10 +
 .../computer/spark/SparkGraphComputer.java      | 187 ++++++++++---------
 .../process/computer/spark/SparkMapEmitter.java |   2 +-
 .../process/computer/spark/SparkMemory.java     |  70 +++----
 .../process/computer/spark/SparkVertex.java     |  17 ++
 .../hadoop/process/computer/util/Rule.java      |  77 ++++++++
 .../hadoop/structure/HadoopConfiguration.java   |  20 +-
 .../gremlin/hadoop/structure/HadoopGraph.java   |   8 +-
 .../gremlin/hadoop/HadoopGraphProvider.java     |  16 +-
 ...HadoopGraphProcessComputerIntegrateTest.java |  32 ----
 ...GiraphGraphComputerProcessIntegrateTest.java |  32 ++++
 .../computer/giraph/GiraphGraphProvider.java    |  58 ++++++
 ...GraphComputerGroovyProcessIntegrateTest.java |  33 ++++
 .../SparkGraphComputerProcessIntegrateTest.java |  31 +++
 .../computer/spark/SparkGraphProvider.java      |  52 ++++++
 ...GraphComputerGroovyProcessIntegrateTest.java |  34 ++++
 ...GraphGroovyProcessComputerIntegrateTest.java |  33 ----
 24 files changed, 707 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/DefaultMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/DefaultMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/DefaultMemory.java
new file mode 100644
index 0000000..9fee439
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/DefaultMemory.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.computer.util;
+
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class DefaultMemory implements Memory.Admin {
+
+    private Map<String, Object> memory = new HashMap<>();
+    private int iteration = 0;
+    private long runtime = 0l;
+
+    public DefaultMemory() {
+
+    }
+
+    public DefaultMemory(final Memory.Admin copyMemory) {
+        this.iteration = copyMemory.getIteration();
+        this.runtime = copyMemory.getRuntime();
+        copyMemory.keys().forEach(key -> this.memory.put(key, copyMemory.get(key)));
+    }
+
+    @Override
+    public void setIteration(final int iteration) {
+        this.iteration = iteration;
+    }
+
+    @Override
+    public void setRuntime(final long runtime) {
+        this.runtime = runtime;
+    }
+
+    @Override
+    public Set<String> keys() {
+        return Collections.unmodifiableSet(this.memory.keySet());
+    }
+
+    @Override
+    public <R> R get(final String key) throws IllegalArgumentException {
+        final R r = (R) this.memory.get(key);
+        if (null == r)
+            throw Memory.Exceptions.memoryDoesNotExist(key);
+        else
+            return r;
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        this.memory.put(key, value);
+    }
+
+    @Override
+    public int getIteration() {
+        return this.iteration;
+    }
+
+    @Override
+    public long getRuntime() {
+        return this.runtime;
+    }
+
+    @Override
+    public long incr(final String key, final long delta) {
+        final Long value = (Long) this.memory.get(key);
+        final Long newValue = (null == value) ? delta : delta + value;
+        this.memory.put(key, newValue);
+        return newValue;
+    }
+
+    @Override
+    public boolean and(final String key, final boolean bool) {
+        final Boolean value = (Boolean) this.memory.get(key);
+        final Boolean newValue = (null == value) ? bool : bool && value;
+        this.memory.put(key, newValue);
+        return newValue;
+    }
+
+    @Override
+    public boolean or(final String key, final boolean bool) {
+        final Boolean value = (Boolean) this.memory.get(key);
+        final Boolean newValue = (null == value) ? bool : bool || value;
+        this.memory.put(key, newValue);
+        return newValue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertex.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertex.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertex.java
index 2517986..6469a62 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertex.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertex.java
@@ -118,7 +118,7 @@ public class DetachedVertex extends DetachedElement<Vertex> implements Vertex, V
         if (hostVertex.equals(this))
             return hostVertex;
         else
-            throw new IllegalStateException("The host vertex must be the detached vertex to attach: " + this);
+            throw new IllegalStateException("The host vertex must be the detached vertex to attach: " + this + "!=" + hostVertex);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/conf/spark-gryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/spark-gryo.properties b/hadoop-gremlin/conf/spark-gryo.properties
new file mode 100644
index 0000000..a25482a
--- /dev/null
+++ b/hadoop-gremlin/conf/spark-gryo.properties
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
+gremlin.hadoop.defaultGraphComputer=org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer
+gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
+gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
+gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+gremlin.hadoop.deriveMemory=false
+gremlin.hadoop.jarsInDistributedCache=false
+
+gremlin.hadoop.inputLocation=hdfs://localhost:9000/user/marko/tinkerpop-modern-vertices.kryo
+gremlin.hadoop.outputLocation=output
+
+# the vertex program to execute
+# gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram
+gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram
+gremlin.traversalVertexProgram.traversalSupplier.type=CLASS
+gremlin.traversalVertexProgram.traversalSupplier.object=org.apache.tinkerpop.gremlin.hadoop.process.computer.example.TraversalSupplier1
+
+# It is possible to provide Spark configuration parameters for use with SparkGraphComputer
+##########################################################################################
+spark.master=local[4]
+spark.executor.memory=1024m
+spark.eventLog.enabled=true
+spark.serializer=org.apache.spark.serializer.JavaSerializer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/conf/spark-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/spark-kryo.properties b/hadoop-gremlin/conf/spark-kryo.properties
deleted file mode 100644
index 85426a0..0000000
--- a/hadoop-gremlin/conf/spark-kryo.properties
+++ /dev/null
@@ -1,38 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
-gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
-gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
-gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-gremlin.hadoop.deriveMemory=false
-gremlin.hadoop.jarsInDistributedCache=false
-
-gremlin.hadoop.inputLocation=hdfs://localhost:9000/user/marko/tinkerpop-modern-vertices.kryo
-gremlin.hadoop.outputLocation=output
-
-# the vertex program to execute
-gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram
-
-# It is possible to provide Spark configuration parameters for use with SparkGraphComputer
-##########################################################################################
-spark.master=local[4]
-spark.executor.memory=1024m
-spark.eventLog.enabled=true
-spark.serializer=org.apache.spark.serializer.JavaSerializer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index bf06fcc..60ef636 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -30,6 +30,7 @@ public class Constants {
     public static final String GREMLIN_HADOOP_GRAPH_INPUT_FORMAT = "gremlin.hadoop.graphInputFormat";
     public static final String GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT = "gremlin.hadoop.graphOutputFormat";
     public static final String GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT = "gremlin.hadoop.memoryOutputFormat";
+    public static final String GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER = "gremlin.hadoop.defaultGraphComputer";
 
     public static final String GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE = "gremlin.hadoop.jarsInDistributedCache";
     public static final String SYSTEM_G = Graph.Hidden.hide("g");

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/example/TraversalSupplier1.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/example/TraversalSupplier1.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/example/TraversalSupplier1.java
index 1779809..dd9b5b7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/example/TraversalSupplier1.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/example/TraversalSupplier1.java
@@ -20,6 +20,9 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.example;
 
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.process.Traversal;
+import org.apache.tinkerpop.gremlin.process.TraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.structure.Graph;
 
 import java.util.function.Supplier;
 
@@ -29,6 +32,8 @@ import java.util.function.Supplier;
 public class TraversalSupplier1 implements Supplier<Traversal> {
     @Override
     public Traversal get() {
-        return HadoopGraph.open().V().out().out().values("name");
+        final Graph graph = HadoopGraph.open();
+        graph.engine(ComputerTraversalEngine.computer);
+        return graph.V().out().out().values("name");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
new file mode 100644
index 0000000..59da2f4
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+
+import org.apache.spark.AccumulatorParam;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class RuleAccumulator implements AccumulatorParam<Rule> {
+
+    @Override
+    public Rule addAccumulator(final Rule a, final Rule b) {
+        return new Rule(b.operation, b.operation.compute(a.object, b.object));
+    }
+
+    @Override
+    public Rule addInPlace(final Rule a, final Rule b) {
+        return new Rule(b.operation, b.operation.compute(a.object, b.object));
+    }
+
+    @Override
+    public Rule zero(final Rule rule) {
+        return new Rule(Rule.Operation.NO_OP, null);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
index b4a8005..73e3d08 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
@@ -19,6 +19,8 @@
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
 import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationUtils;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -32,6 +34,14 @@ public final class SerializableConfiguration extends AbstractConfiguration imple
 
     private final Map<String, Object> configurations = new HashMap<>();
 
+    public SerializableConfiguration() {
+
+    }
+
+    public SerializableConfiguration(final Configuration configuration) {
+        ConfigurationUtils.copy(configuration, this);
+    }
+
     @Override
     protected void addPropertyDirect(final String key, final Object value) {
         this.configurations.put(key, value);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 7cace20..946d2af 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
+import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.FileConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.hadoop.conf.Configuration;
@@ -41,9 +42,10 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankMapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.util.DefaultMemory;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
@@ -54,10 +56,8 @@ import scala.Tuple2;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
@@ -119,11 +119,12 @@ public class SparkGraphComputer implements GraphComputer {
         if (null != this.vertexProgram)
             GraphComputerHelper.validateProgramOnComputer(this, vertexProgram);
 
+        final org.apache.commons.configuration.Configuration apacheConfiguration = this.hadoopGraph.configuration();
         final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.hadoopGraph.configuration());
-        final SparkMemory memory = new SparkMemory(Collections.emptySet());
 
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
                     final long startTime = System.currentTimeMillis();
+                    SparkMemory memory = null;
                     // load the graph
                     if (null != this.vertexProgram) {
                         final SparkConf sparkConfiguration = new SparkConf();
@@ -135,49 +136,57 @@ public class SparkGraphComputer implements GraphComputer {
                         // set up the input format
                         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
-                        final JavaPairRDD<NullWritable, VertexWritable> rdd = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
-                                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
-                                NullWritable.class,
-                                VertexWritable.class);
-                        final JavaPairRDD<Object, SparkMessenger<Object>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
-                        GraphComputerRDD<Object> g = GraphComputerRDD.of(rdd2);
+                        ///
+                        try {
+                            final JavaPairRDD<NullWritable, VertexWritable> rdd = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                                    (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+                                    NullWritable.class,
+                                    VertexWritable.class);
+                            final JavaPairRDD<Object, SparkMessenger<Object>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
+                            GraphComputerRDD<Object> g = GraphComputerRDD.of(rdd2);
 
-                        // set up the vertex program
-                        this.vertexProgram.setup(memory);
-                        final org.apache.commons.configuration.Configuration vertexProgramConfiguration = new SerializableConfiguration();
-                        this.vertexProgram.storeState(vertexProgramConfiguration);
-
-                        // execute the vertex program
-                        while (true) {
-                            g = g.execute(vertexProgramConfiguration, memory);
-                            g.foreachPartition(iterator -> doNothing());
-                            memory.incrIteration();
-                            if (this.vertexProgram.terminate(memory))
-                                break;
-                        }
-                        // write the output graph back to disk
-                        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-                        if (null != outputLocation) {
-                            try {
-                                FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
-                            } catch (final IOException e) {
-                                throw new IllegalStateException(e.getMessage(), e);
+                            // set up the vertex program
+                            memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
+                            this.vertexProgram.setup(memory);
+                            final SerializableConfiguration vertexProgramConfiguration = new SerializableConfiguration();
+                            this.vertexProgram.storeState(vertexProgramConfiguration);
+                            this.mapReducers.addAll(this.vertexProgram.getMapReducers());
+                            ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
+                            ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+                            // execute the vertex program
+                            while (true) {
+                                g = g.execute(vertexProgramConfiguration, memory);
+                                g.foreachPartition(iterator -> doNothing());
+                                memory.incrIteration();
+                                if (this.vertexProgram.terminate(memory))
+                                    break;
+                            }
+                            // write the output graph back to disk
+                            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+                            if (null != outputLocation) {
+                                try {
+                                    FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
+                                } catch (final IOException e) {
+                                    throw new IllegalStateException(e.getMessage(), e);
+                                }
+                                // map back to a <nullwritable,vertexwritable> stream for output
+                                g.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().vertex)))
+                                        .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
+                                                NullWritable.class,
+                                                VertexWritable.class,
+                                                (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
                             }
-                            // map back to a <nullwritable,vertexwritable> stream for output
-                            g.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().vertex)))
-                                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
-                                            NullWritable.class,
-                                            VertexWritable.class,
-                                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
+                        } finally {
+                            sparkContext.close();
                         }
-                        sparkContext.close();
                     }
 
+                    final Memory.Admin finalMemory = null == memory ? new DefaultMemory() : new DefaultMemory(memory);
                     // execute mapreduce jobs
                     for (final MapReduce mapReduce : this.mapReducers) {
                         // set up the map reduce job
-                        final org.apache.commons.configuration.Configuration mapReduceConfiguration = new SerializableConfiguration();
-                        mapReduce.storeState(mapReduceConfiguration);
+                        final SerializableConfiguration newConfiguration = new SerializableConfiguration(apacheConfiguration);
+                        mapReduce.storeState(newConfiguration);
 
                         // set up spark job
                         final SparkConf sparkConfiguration = new SparkConf();
@@ -188,59 +197,61 @@ public class SparkGraphComputer implements GraphComputer {
                         // set up the input format
                         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
-                        final JavaPairRDD<NullWritable, VertexWritable> g = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
-                                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
-                                NullWritable.class,
-                                VertexWritable.class);
+                        try {
+                            final JavaPairRDD<NullWritable, VertexWritable> g = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                                    (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+                                    NullWritable.class,
+                                    VertexWritable.class);
 
-                        // map
-                        JavaPairRDD<?, ?> mapRDD = g.flatMapToPair(tuple -> {
-                            final MapReduce m = MapReduce.createMapReduce(mapReduceConfiguration);
-                            final SparkMapEmitter mapEmitter = new SparkMapEmitter();
-                            m.map(tuple._2().get(), mapEmitter);
-                            return mapEmitter.getEmissions();
-                        });
-                        if (mapReduce.getMapKeySort().isPresent())
-                            mapRDD = mapRDD.sortByKey((Comparator) mapReduce.getMapKeySort().get());
-                        // todo: combine
-                        // reduce
-                        JavaPairRDD<?, ?> reduceRDD = null;
-                        if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
-                            reduceRDD = mapRDD.groupByKey().flatMapToPair(tuple -> {
-                                final MapReduce m = MapReduce.createMapReduce(mapReduceConfiguration);
-                                final SparkReduceEmitter reduceEmitter = new SparkReduceEmitter();
-                                m.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter);
-                                return reduceEmitter.getEmissions();
+                            // map
+                            JavaPairRDD<?, ?> mapRDD = g.flatMapToPair(tuple -> {
+                                final MapReduce m = MapReduce.createMapReduce(newConfiguration);
+                                final SparkMapEmitter mapEmitter = new SparkMapEmitter();
+                                m.map(tuple._2().get(), mapEmitter);
+                                return mapEmitter.getEmissions();
                             });
-                            if (mapReduce.getReduceKeySort().isPresent())
-                                reduceRDD = reduceRDD.sortByKey((Comparator) mapReduce.getReduceKeySort().get());
-                        }
-                        // write the output graph back to disk
-                        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-                        if (null != outputLocation) {
-                            // map back to a <nullwritable,vertexwritable> stream for output
-                            ((null == reduceRDD) ? mapRDD : reduceRDD).mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
-                                    ObjectWritable.class,
-                                    ObjectWritable.class,
-                                    (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));
-                            // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
-                            // to get results you have to look through HDFS directory structure. Oh the horror.
-                            try {
-                                if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
-                                    mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
-                                else
-                                    HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
-                            } catch (final IOException e) {
-                                throw new IllegalStateException(e.getMessage(), e);
+                            if (mapReduce.getMapKeySort().isPresent())
+                                mapRDD = mapRDD.sortByKey((Comparator) mapReduce.getMapKeySort().get());
+                            // todo: combine
+                            // reduce
+                            JavaPairRDD<?, ?> reduceRDD = null;
+                            if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
+                                reduceRDD = mapRDD.groupByKey().flatMapToPair(tuple -> {
+                                    final MapReduce m = MapReduce.createMapReduce(newConfiguration);
+                                    final SparkReduceEmitter reduceEmitter = new SparkReduceEmitter();
+                                    m.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter);
+                                    return reduceEmitter.getEmissions();
+                                });
+                                if (mapReduce.getReduceKeySort().isPresent())
+                                    reduceRDD = reduceRDD.sortByKey((Comparator) mapReduce.getReduceKeySort().get());
+                            }
+                            // write the output graph back to disk
+                            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+                            if (null != outputLocation) {
+                                // map back to a Hadoop stream for output
+                                ((null == reduceRDD) ? mapRDD : reduceRDD).mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+                                        ObjectWritable.class,
+                                        ObjectWritable.class,
+                                        (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));
+                                // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
+                                // to get results you have to look through HDFS directory structure. Oh the horror.
+                                try {
+                                    if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
+                                        mapReduce.addResultToMemory(finalMemory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+                                    else
+                                        HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+                                } catch (final IOException e) {
+                                    throw new IllegalStateException(e.getMessage(), e);
+                                }
                             }
+                        } finally {
+                            sparkContext.close();
                         }
-                        sparkContext.close();
                     }
 
                     // update runtime and return the newly computed graph
-                    memory.setRuntime(System.currentTimeMillis() - startTime);
-                    memory.complete();
-                    return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph), memory.asImmutable());
+                    finalMemory.setRuntime(System.currentTimeMillis() - startTime);
+                    return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph), finalMemory.asImmutable());
                 }
         );
     }
@@ -270,13 +281,13 @@ public class SparkGraphComputer implements GraphComputer {
     /////////////////
 
     public static void main(final String[] args) throws Exception {
-        final FileConfiguration configuration = new PropertiesConfiguration("/Users/marko/software/tinkerpop/tinkerpop3/hadoop-gremlin/conf/spark-kryo.properties");
+        final FileConfiguration configuration = new PropertiesConfiguration("/Users/marko/software/tinkerpop/tinkerpop3/hadoop-gremlin/conf/spark-gryo.properties");
         // TODO: final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
         final HadoopGraph graph = HadoopGraph.open(configuration);
-        final ComputerResult result = new SparkGraphComputer(graph).program(VertexProgram.createVertexProgram(configuration)).mapReduce(PageRankMapReduce.build().create()).submit().get();
+        final ComputerResult result = new SparkGraphComputer(graph).program(VertexProgram.createVertexProgram(configuration)).submit().get();
         // TODO: remove everything below
         System.out.println(result);
-        result.memory().<Iterator>get(PageRankMapReduce.DEFAULT_MEMORY_KEY).forEachRemaining(System.out::println);
+        //result.memory().<Iterator>get(PageRankMapReduce.DEFAULT_MEMORY_KEY).forEachRemaining(System.out::println);
         //result.graph().configuration().getKeys().forEachRemaining(key -> System.out.println(key + "-->" + result.graph().configuration().getString(key)));
         result.graph().V().valueMap().forEachRemaining(System.out::println);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
index 3a4a424..0f5acc1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
@@ -33,7 +33,7 @@ public class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
 
     @Override
     public void emit(final K key, final V value) {
-        emissions.add(new Tuple2<>(key, value));
+        this.emissions.add(new Tuple2<>(key, value));
     }
 
     public Iterable<Tuple2<K, V>> getEmissions() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
index eb2af7f..b277e83 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
@@ -18,18 +18,22 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -39,28 +43,34 @@ import java.util.concurrent.atomic.AtomicLong;
 public final class SparkMemory implements Memory.Admin, Serializable {
 
     public final Set<String> memoryKeys = new HashSet<>();
-    public Map<String, Object> previousMap;
-    public Map<String, Object> currentMap;
     private final AtomicInteger iteration = new AtomicInteger(0);
     private final AtomicLong runtime = new AtomicLong(0l);
-
-    public SparkMemory(final Set<MapReduce> mapReducers) {
-        this.currentMap = new ConcurrentHashMap<>();
-        this.previousMap = new ConcurrentHashMap<>();
-        //if (null != vertexProgram) {
-        for (final String key : (Set<String>) PageRankVertexProgram.build().create().getMemoryComputeKeys()) {
-            MemoryHelper.validateKey(key);
-            this.memoryKeys.add(key);
+    private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
+
+    public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
+        if (null != vertexProgram) {
+            for (final String key : vertexProgram.getMemoryComputeKeys()) {
+                MemoryHelper.validateKey(key);
+                this.memoryKeys.add(key);
+            }
         }
-        //}
         for (final MapReduce mapReduce : mapReducers) {
             this.memoryKeys.add(mapReduce.getMemoryKey());
         }
+        for (final String key : this.memoryKeys) {
+            this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), new RuleAccumulator()));
+        }
+
     }
 
     @Override
     public Set<String> keys() {
-        return this.previousMap.keySet();
+        final Set<String> trueKeys = new HashSet<>();
+        this.memory.forEach((key, value) -> {
+            if (value.value().object != null)
+                trueKeys.add(key);
+        });
+        return Collections.unmodifiableSet(trueKeys);
     }
 
     @Override
@@ -88,16 +98,6 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         return this.runtime.get();
     }
 
-    protected void complete() {
-        this.iteration.decrementAndGet();
-        this.previousMap = this.currentMap;
-    }
-
-    protected void completeSubRound() {
-        this.previousMap = new ConcurrentHashMap<>(this.currentMap);
-
-    }
-
     @Override
     public boolean isInitialIteration() {
         return this.getIteration() == 0;
@@ -105,7 +105,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
 
     @Override
     public <R> R get(final String key) throws IllegalArgumentException {
-        final R r = (R) this.previousMap.get(key);
+        final R r = (R) this.memory.get(key).value().object;
         if (null == r)
             throw Memory.Exceptions.memoryDoesNotExist(key);
         else
@@ -115,28 +115,28 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     @Override
     public long incr(final String key, final long delta) {
         checkKeyValue(key, delta);
-        this.currentMap.compute(key, (k, v) -> null == v ? delta : delta + (Long) v);
-        return (Long) this.previousMap.getOrDefault(key, 0l) + delta;
+        this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
+        return (Long) this.memory.get(key).value().object + delta;
     }
 
     @Override
     public boolean and(final String key, final boolean bool) {
         checkKeyValue(key, bool);
-        this.currentMap.compute(key, (k, v) -> null == v ? bool : bool && (Boolean) v);
-        return (Boolean) this.previousMap.getOrDefault(key, true) && bool;
+        this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
+        return bool;
     }
 
     @Override
     public boolean or(final String key, final boolean bool) {
         checkKeyValue(key, bool);
-        this.currentMap.compute(key, (k, v) -> null == v ? bool : bool || (Boolean) v);
-        return (Boolean) this.previousMap.getOrDefault(key, true) || bool;
+        this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
+        return bool;
     }
 
     @Override
     public void set(final String key, final Object value) {
         checkKeyValue(key, value);
-        this.currentMap.put(key, value);
+        this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
     }
 
     @Override
@@ -145,8 +145,8 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     }
 
     private void checkKeyValue(final String key, final Object value) {
-        //if (!this.memoryKeys.contains(key))
-        //    throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
-        //MemoryHelper.validateValue(value);
+        if (!this.memoryKeys.contains(key))
+            throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
+        MemoryHelper.validateValue(value);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
index 0a02156..fc6c4f2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
@@ -25,6 +25,8 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
+import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
 
@@ -103,6 +105,21 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
         return this.vertex.iterators().propertyIterator(propertyKeys);
     }
 
+    @Override
+    public String toString() {
+        return StringFactory.vertexString(this);
+    }
+
+    @Override
+    public int hashCode() {
+        return ElementHelper.hashCode(this);
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+        return ElementHelper.areEqual(this, other);
+    }
+
     ///////////////////////////////
 
     private void writeObject(final ObjectOutputStream outputStream) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
new file mode 100644
index 0000000..16e2189
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.util;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class Rule implements Serializable {
+
+    public enum Operation {
+        OR {
+            public Boolean compute(final Object first, final Object second) {
+                if (null == first)
+                    return (Boolean) second;
+                else if (null == second)
+                    return (Boolean) first;
+                else
+                    return (Boolean) first || (Boolean) second;
+            }
+        }, AND {
+            public Boolean compute(final Object first, final Object second) {
+                if (null == first)
+                    return (Boolean) second;
+                else if (null == second)
+                    return (Boolean) first;
+                else
+                    return (Boolean) first && (Boolean) second;
+            }
+        }, INCR {
+            public Long compute(final Object first, final Object second) {
+                if (null == first)
+                    return (Long) second;
+                else if (null == second)
+                    return (Long) first;
+                else
+                    return (Long) first + (Long) second;
+
+            }
+        }, SET {
+            public Object compute(final Object first, final Object second) {
+                return second;
+            }
+        }, NO_OP {
+            public Object compute(final Object first, final Object second) {
+                return null == first ? second : first;
+            }
+        };
+
+        public abstract Object compute(final Object first, final Object second);
+    }
+
+    public final Operation operation;
+    public final Object object;
+
+    public Rule(final Operation operation, final Object object) {
+        this.operation = operation;
+        this.object = object;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index 40b786e..76636cd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -18,14 +18,16 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure;
 
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.util.StreamFactory;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.util.StreamFactory;
 import org.javatuples.Pair;
 
 import java.io.Serializable;
@@ -84,6 +86,18 @@ public class HadoopConfiguration extends BaseConfiguration implements Serializab
         this.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
     }
 
+    public Class<? extends GraphComputer> getGraphComputer() {
+        if (!this.containsKey(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER))
+            return GiraphGraphComputer.class;
+        else {
+            try {
+                return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER));
+            } catch (final ClassNotFoundException e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+        }
+    }
+
     @Override
     public Iterator iterator() {
         return StreamFactory.stream(this.getKeys()).map(k -> new Pair(k, this.getProperty(k))).iterator();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 7feb9e3..8154888 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.process.graph.traversal.strategy.HadoopElementStepStrategy;
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopEdgeIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopVertexIterator;
@@ -141,9 +142,11 @@ public class HadoopGraph implements Graph, Graph.Iterators {
 
     protected final HadoopConfiguration configuration;
     private TraversalEngine traversalEngine = StandardTraversalEngine.standard;
+    private Class<? extends GraphComputer> graphComputerClass = GiraphGraphComputer.class;
 
     private HadoopGraph(final Configuration configuration) {
         this.configuration = new HadoopConfiguration(configuration);
+        this.graphComputerClass = this.configuration.getGraphComputer();
     }
 
     public static HadoopGraph open() {
@@ -161,13 +164,14 @@ public class HadoopGraph implements Graph, Graph.Iterators {
 
     @Override
     public void compute(final Class<? extends GraphComputer> graphComputerClass) {
-        if (!graphComputerClass.equals(GiraphGraphComputer.class))
+        if (!graphComputerClass.equals(GiraphGraphComputer.class) || !graphComputerClass.equals(SparkGraphComputer.class))
             throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass);
+        this.graphComputerClass = graphComputerClass;
     }
 
     @Override
     public GraphComputer compute() {
-        return new GiraphGraphComputer(this);
+        return this.graphComputerClass.equals(GiraphGraphComputer.class) ? new GiraphGraphComputer(this) : new SparkGraphComputer(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 300def1..69c0b36 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -19,12 +19,10 @@
 package org.apache.tinkerpop.gremlin.hadoop;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.giraph.conf.GiraphConstants;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.TestHelper;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopElement;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -96,23 +94,11 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
     @Override
     public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName) {
         return new HashMap<String, Object>() {{
-            put("gremlin.graph", HadoopGraph.class.getName());
+            put(Graph.GRAPH, HadoopGraph.class.getName());
             put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
             put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
-            //put(Constants.GREMLIN_GIRAPH_MEMORY_OUTPUT_FORMAT_CLASS, TextOutputFormat.class.getCanonicalName());
             put(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class.getCanonicalName());
-            put(GiraphConstants.MIN_WORKERS, 1);
-            put(GiraphConstants.MAX_WORKERS, 1);
-            put(GiraphConstants.SPLIT_MASTER_WORKER.getKey(), false);
-            //put("giraph.localTestMode", true);
-            put(GiraphConstants.ZOOKEEPER_JAR, GiraphGraphComputer.class.getResource("zookeeper-3.3.3.jar").getPath());
-            put("giraph.zkServerPort", "2181");  // you must have a local zookeeper running on this port
-            put("giraph.nettyServerUseExecutionHandler", false); // this prevents so many integration tests running out of threads
-            put("giraph.nettyClientUseExecutionHandler", false); // this prevents so many integration tests running out of threads
-            //put(Constants.GREMLIN_GIRAPH_INPUT_LOCATION, GryoInputFormat.class.getResource("tinkerpop-classic-vertices.kryo").getPath());
             put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
-            put(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, true);
-            put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         }};
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/HadoopGraphProcessComputerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/HadoopGraphProcessComputerIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/HadoopGraphProcessComputerIntegrateTest.java
deleted file mode 100644
index 653a713..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/HadoopGraphProcessComputerIntegrateTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process;
-
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(ProcessComputerSuite.class)
-@ProcessComputerSuite.GraphProviderClass(provider = HadoopGraphProvider.class, graph = HadoopGraph.class)
-public class HadoopGraphProcessComputerIntegrateTest {
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputerProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputerProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputerProcessIntegrateTest.java
new file mode 100644
index 0000000..7c9eae7
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputerProcessIntegrateTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
+
+import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(ProcessComputerSuite.class)
+@ProcessComputerSuite.GraphProviderClass(provider = GiraphGraphProvider.class, graph = HadoopGraph.class)
+public class GiraphGraphComputerProcessIntegrateTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphProvider.java
new file mode 100644
index 0000000..5650ae5
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GiraphGraphProvider extends HadoopGraphProvider {
+
+    @Override
+    public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName) {
+        return new HashMap<String, Object>() {{
+            put(Graph.GRAPH, HadoopGraph.class.getName());
+            put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class.getCanonicalName());
+            put(GiraphConstants.MIN_WORKERS, 1);
+            put(GiraphConstants.MAX_WORKERS, 1);
+            put(GiraphConstants.SPLIT_MASTER_WORKER.getKey(), false);
+            //put("giraph.localTestMode", true);
+            put(GiraphConstants.ZOOKEEPER_JAR, GiraphGraphComputer.class.getResource("zookeeper-3.3.3.jar").getPath());
+            put("giraph.zkServerPort", "2181");  // you must have a local zookeeper running on this port
+            put("giraph.nettyServerUseExecutionHandler", false); // this prevents so many integration tests running out of threads
+            put("giraph.nettyClientUseExecutionHandler", false); // this prevents so many integration tests running out of threads
+            put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
+            put(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, true);
+            put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        }};
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
new file mode 100644
index 0000000..3e909b4
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/groovy/GiraphGraphComputerGroovyProcessIntegrateTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.groovy;
+
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(GroovyProcessComputerSuite.class)
+@ProcessComputerSuite.GraphProviderClass(provider = GiraphGraphProvider.class, graph = HadoopGraph.class)
+public class GiraphGraphComputerGroovyProcessIntegrateTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
new file mode 100644
index 0000000..91a7fc5
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(ProcessComputerSuite.class)
+@ProcessComputerSuite.GraphProviderClass(provider = SparkGraphProvider.class, graph = HadoopGraph.class)
+public class SparkGraphComputerProcessIntegrateTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphProvider.java
new file mode 100644
index 0000000..255b0a5
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkGraphProvider extends HadoopGraphProvider {
+
+    @Override
+    public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName) {
+        return new HashMap<String, Object>() {{
+            put(Graph.GRAPH, HadoopGraph.class.getName());
+            put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class.getCanonicalName());
+            put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
+            put(Constants.GREMLIN_HADOOP_DERIVE_MEMORY, true);
+            put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+            ///////////
+            put(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+            put("spark.master", "local[4]");
+        }};
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
new file mode 100644
index 0000000..e7b843a
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.groovy;
+
+import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(GroovyProcessComputerSuite.class)
+@ProcessComputerSuite.GraphProviderClass(provider = SparkGraphProvider.class, graph = HadoopGraph.class)
+public class SparkGraphComputerGroovyProcessIntegrateTest {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8246ee6d/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/groovy/HadoopGraphGroovyProcessComputerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/groovy/HadoopGraphGroovyProcessComputerIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/groovy/HadoopGraphGroovyProcessComputerIntegrateTest.java
deleted file mode 100644
index 3827a6f..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/groovy/HadoopGraphGroovyProcessComputerIntegrateTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.groovy;
-
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
-import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(GroovyProcessComputerSuite.class)
-@ProcessComputerSuite.GraphProviderClass(provider = HadoopGraphProvider.class, graph = HadoopGraph.class)
-public class HadoopGraphGroovyProcessComputerIntegrateTest {
-}