You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/01 16:15:52 UTC

incubator-tinkerpop git commit: Cleaned up the SparkGraphComputer algorithm with comments and some reorganization of code. A few minor optimizations here and there.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 1244a8fa8 -> 815a378f0


Cleaned up the SparkGraphComputer algorithm with comments and some reorganization of code. A few minor optimizations here and there.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/815a378f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/815a378f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/815a378f

Branch: refs/heads/master
Commit: 815a378f0b112e0d3034ab56a2ca8aeb846bdda1
Parents: 1244a8f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Apr 1 08:15:47 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Apr 1 08:15:47 2015 -0600

----------------------------------------------------------------------
 .../gremlin/process/computer/GraphComputer.java |   7 +-
 .../process/computer/spark/SparkExecutor.java   | 104 ++++++++++------
 .../computer/spark/SparkGraphComputer.java      |  65 ++++++----
 .../computer/spark/SparkMessagePayload.java     |  49 --------
 .../process/computer/spark/SparkPayload.java    |  48 --------
 .../computer/spark/SparkVertexPayload.java      | 122 -------------------
 6 files changed, 111 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
index 1f2b3f3..e847772 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -129,9 +129,6 @@ public interface GraphComputer {
     }
 
     public interface Features {
-        public default boolean supportsWorkerPersistenceBetweenIterations() {
-            return true;
-        }
 
         public default boolean supportsGlobalMessageScopes() {
             return true;
@@ -173,6 +170,10 @@ public interface GraphComputer {
             return true;
         }
 
+        public default boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
+            return true;
+        }
+
         public default boolean supportsIsolation(final Isolation isolation) {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 72aa4d2..5e4996a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -30,11 +30,14 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -46,7 +49,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -56,58 +58,68 @@ public final class SparkExecutor {
     private SparkExecutor() {
     }
 
-    public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessageRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+    ////////////////////
+    // VERTEX PROGRAM //
+    ////////////////////
 
-        // execute vertex program iteration
-        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndOutgoingMessagesRDD = null == viewAndMessageRDD ?
+    public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration(
+            final JavaPairRDD<Object, VertexWritable> graphRDD,
+            final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessagesRDD,
+            final SparkMemory memory,
+            final Configuration apacheConfiguration) {
+
+        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndOutgoingMessagesRDD = null == viewAndMessagesRDD ?
+                // if this is the first iteration, there are now views or incoming messages
                 graphRDD.mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
-                    final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
-                    final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
-                    workerVertexProgram.workerIterationStart(memory);
+                    final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();  // the compute keys as a set
+                    final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+                    workerVertexProgram.workerIterationStart(memory); // start the worker
                     return () -> IteratorUtils.map(partitionIterator, vertexWritable -> {
                         final Vertex vertex = vertexWritable._2().get();
-                        final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, Collections.emptyList());
-                        workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory);
-                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages();
-                        final List<DetachedVertexProperty<Object>> newView = new ArrayList<>();
-                        if (null != elementComputeKeysArray)
+                        final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, Collections.emptyList());  // create the messenger with no incoming messages
+                        workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex
+                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+                        final List<DetachedVertexProperty<Object>> newView = new ArrayList<>(); // get the computed view
+                        if (null != elementComputeKeysArray) // not all vertex programs have compute keys
                             vertex.properties(elementComputeKeysArray).forEachRemaining(property -> newView.add(DetachedFactory.detach(property, true)));
                         if (!partitionIterator.hasNext())
-                            workerVertexProgram.workerIterationEnd(memory);
+                            workerVertexProgram.workerIterationEnd(memory);  // if no more vertices in the partition, end the worker's iteration
                         return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
                     });
                 }) :
-                graphRDD.leftOuterJoin(viewAndMessageRDD)
+                // join the view/messages to the graph
+                graphRDD.leftOuterJoin(viewAndMessagesRDD)
                         .mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
                             final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
-                            final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
-                            final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
-                            workerVertexProgram.workerIterationStart(memory);
+                            final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
+                            final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+                            workerVertexProgram.workerIterationStart(memory); // start the worker
                             return () -> IteratorUtils.map(partitionIterator, vertexWritableAndIncomingMessages -> {
                                 final Vertex vertex = vertexWritableAndIncomingMessages._2()._1().get();
                                 final List<M> incomingMessages = vertexWritableAndIncomingMessages._2()._2().isPresent() ? vertexWritableAndIncomingMessages._2()._2().get()._2() : Collections.emptyList();
                                 final List<DetachedVertexProperty<Object>> view = vertexWritableAndIncomingMessages._2()._2().isPresent() ? vertexWritableAndIncomingMessages._2()._2().get()._1() : Collections.emptyList();
-                                view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
-                                final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages);
-                                workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory);
-                                final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages();
-                                final List<DetachedVertexProperty<Object>> newView = new ArrayList<>();
-                                if (null != elementComputeKeysArray)
+                                view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));  // attach the view to the vertex
+                                final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
+                                workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex
+                                final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+                                final List<DetachedVertexProperty<Object>> newView = new ArrayList<>(); // get the computed view
+                                if (null != elementComputeKeysArray)  // not all vertex programs have compute keys
                                     vertex.properties(elementComputeKeysArray).forEachRemaining(property -> newView.add(DetachedFactory.detach(property, true)));
                                 if (!partitionIterator.hasNext())
-                                    workerVertexProgram.workerIterationEnd(memory);
+                                    workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
                                 return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
                             });
                         });
-        viewAndOutgoingMessagesRDD.cache();
+
+        viewAndOutgoingMessagesRDD.cache();  // will use twice (once for message passing and once for view isolation)
 
         // "message pass" by reducing on the vertex object id of the message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
-        final JavaPairRDD<Object, List<M>> incomingMessages = viewAndOutgoingMessagesRDD
+        final JavaPairRDD<Object, List<M>> incomingMessagesRDD = viewAndOutgoingMessagesRDD
                 .mapValues(Tuple2::_2)
                 .flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2().iterator(), message -> {
-                    final List<M> list = new ArrayList<>();
+                    final List<M> list = (null == messageCombiner) ? new ArrayList<>() : new ArrayList<>(1);
                     list.add(message._2());
                     return new Tuple2<>(message._1(), list);
                 })).reduceByKey((a, b) -> {
@@ -115,35 +127,49 @@ public final class SparkExecutor {
                         a.addAll(b);
                         return a;
                     } else {
-                        final M m = messageCombiner.combine(a.get(0), b.get(0));
-                        a.clear();
-                        b.clear();
-                        a.add(m);
+                        a.set(0, messageCombiner.combine(a.get(0), b.get(0)));
                         return a;
                     }
                 });
 
-        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> newViewMessageRDD = viewAndOutgoingMessagesRDD
+        // isolate the views and then join the incoming messages
+        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndIncomingMessagesRDD = viewAndOutgoingMessagesRDD
                 .mapValues(Tuple2::_1)
-                .fullOuterJoin(incomingMessages)
+                .fullOuterJoin(incomingMessagesRDD)
                 .mapValues(tuple -> new Tuple2<>(tuple._1().or(Collections.emptyList()), tuple._2().or(Collections.emptyList())));
 
-        newViewMessageRDD.foreachPartition(partitionIterator -> {
-        }); // need to complete a task so its BSP.
-        return newViewMessageRDD;
+        viewAndIncomingMessagesRDD.foreachPartition(partitionIterator -> {
+        }); // need to complete a task so its BSP and the memory for this iteration is updated
+        return viewAndIncomingMessagesRDD;
     }
 
     /////////////////
     // MAP REDUCE //
     ////////////////
 
+    public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessagesRDD) {
+        return (null == viewAndMessagesRDD) ?
+                graphRDD.mapValues(vertexWritable -> {
+                    vertexWritable.get().edges(Direction.BOTH).forEachRemaining(Edge::remove);
+                    return vertexWritable;
+                }) :
+                graphRDD.leftOuterJoin(viewAndMessagesRDD)
+                        .mapValues(tuple -> {
+                            final Vertex vertex = tuple._1().get();
+                            vertex.edges(Direction.BOTH).forEachRemaining(Edge::remove);
+                            final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get()._1() : Collections.emptyList();
+                            view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
+                            return tuple._1();
+                        });
+    }
+
     public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
             workerMapReduce.workerStart(MapReduce.Stage.MAP);
             final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
-            return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
-                workerMapReduce.map(keyValue._2().get(), mapEmitter);
+            return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
+                workerMapReduce.map(vertexWritable._2().get(), mapEmitter);
                 if (!partitionIterator.hasNext())
                     workerMapReduce.workerEnd(MapReduce.Stage.MAP);
                 return mapEmitter.getEmissions();
@@ -222,7 +248,7 @@ public final class SparkExecutor {
                 if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
                     mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
                 else
-                    HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+                    mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
             } catch (final IOException e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index f09b649..8a1278a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -42,9 +42,6 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.slf4j.Logger;
@@ -52,7 +49,6 @@ import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 import java.io.File;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -167,10 +163,10 @@ public final class SparkGraphComputer implements GraphComputer {
                                 (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                 NullWritable.class,
                                 VertexWritable.class)
-                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get()))) // TODO: use DetachedVertex?
                                 .reduceByKey((a, b) -> a) // TODO: test without doing this reduce
                                 .cache(); // partition the graph across the cluster
-                        JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Object>>> viewAndMessageRDD = null;
+                        JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Object>>> viewAndMessagesRDD = null;
 
                         ////////////////////////////////
                         // process the vertex program //
@@ -188,7 +184,7 @@ public final class SparkGraphComputer implements GraphComputer {
                             // execute the vertex program
                             while (true) {
                                 memory.setInTask(true);
-                                viewAndMessageRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewAndMessageRDD, memory, vertexProgramConfiguration);
+                                viewAndMessagesRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewAndMessagesRDD, memory, vertexProgramConfiguration);
                                 memory.setInTask(false);
                                 if (this.vertexProgram.terminate(memory))
                                     break;
@@ -209,21 +205,8 @@ public final class SparkGraphComputer implements GraphComputer {
                         //////////////////////////////
                         if (!this.mapReducers.isEmpty()) {
                             // drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
-                            final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = null == viewAndMessageRDD ?  // TODO: move to SparkExecutor
-                                    graphRDD.mapValues(vertexWritable -> {
-                                        vertexWritable.get().edges(Direction.BOTH).forEachRemaining(Edge::remove);
-                                        return vertexWritable;
-                                    })
-                                            .cache() :
-                                    graphRDD.leftOuterJoin(viewAndMessageRDD)
-                                            .mapValues(tuple -> {
-                                                final Vertex vertex = tuple._1().get();
-                                                vertex.edges(Direction.BOTH).forEachRemaining(Edge::remove);
-                                                final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get()._1() : Collections.emptyList();
-                                                view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
-                                                return tuple._1();
-                                            })
-                                            .cache();
+                            final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewAndMessagesRDD).cache();
+                            graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
 
                             for (final MapReduce mapReduce : this.mapReducers) {
                                 // execute the map reduce job
@@ -274,7 +257,43 @@ public final class SparkGraphComputer implements GraphComputer {
     @Override
     public Features features() {
         return new Features() {
-            @Override
+
+            public boolean supportsVertexAddition() {
+                return false;
+            }
+
+            public boolean supportsVertexRemoval() {
+                return false;
+            }
+
+            public boolean supportsVertexPropertyRemoval() {
+                return false;
+            }
+
+            public boolean supportsEdgeAddition() {
+                return false;
+            }
+
+            public boolean supportsEdgeRemoval() {
+                return false;
+            }
+
+            public boolean supportsEdgePropertyAddition() {
+                return false;
+            }
+
+            public boolean supportsEdgePropertyRemoval() {
+                return false;
+            }
+
+            public boolean supportsIsolation(final Isolation isolation) {
+                return isolation.equals(Isolation.BSP);
+            }
+
+            public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
+                return persist.equals(Persist.NOTHING) || resultGraph.equals(ResultGraph.NEW);
+            }
+
             public boolean supportsDirectObjects() {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
deleted file mode 100644
index 4e58cf1..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMessagePayload<M> implements Serializable, SparkPayload<M> {
-
-    public final List<M> messages = new ArrayList<>();
-
-    public SparkMessagePayload() {
-
-    }
-
-    public SparkMessagePayload(final M message) {
-        this.messages.add(message);
-    }
-
-    @Override
-    public List<M> getMessages() {
-        return this.messages;
-    }
-
-    @Override
-    public final boolean isVertex() {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
deleted file mode 100644
index 3ec1d5b..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-
-import java.util.List;
-import java.util.stream.Stream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface SparkPayload<M> {
-
-    public default void addMessages(final List<M> otherMessages, final MessageCombiner<M> messageCombiner) {
-        if (null != messageCombiner) {
-            final M message = Stream.concat(this.getMessages().stream(),otherMessages.stream()).reduce(messageCombiner::combine).get();
-            this.getMessages().clear();
-            this.getMessages().add(message);
-        } else {
-            this.getMessages().addAll(otherMessages);
-        }
-    }
-
-    public List<M> getMessages();
-
-    public boolean isVertex();
-
-    public default SparkVertexPayload<M> asVertexPayload() {
-        return (SparkVertexPayload<M>) this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
deleted file mode 100644
index 0610750..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import scala.Tuple2;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M>, Serializable {
-
-    private final VertexWritable vertexWritable;
-    private final List<M> incoming;
-    private List<Tuple2<Object, M>> outgoing;
-
-    public SparkVertexPayload(final Vertex vertex) {
-        this.vertexWritable = new VertexWritable(vertex);
-        this.incoming = new ArrayList<>();
-        this.outgoing = new ArrayList<>();
-    }
-
-    @Override
-    public final boolean isVertex() {
-        return true;
-    }
-
-    @Override
-    public SparkVertexPayload<M> asVertexPayload() {
-        return this;
-    }
-
-    @Override
-    public List<M> getMessages() {
-        return this.incoming;
-    }
-
-    public Vertex getVertex() {
-        return this.vertexWritable.get();
-    }
-
-    public VertexWritable getVertexWritable() {
-        return this.vertexWritable;
-    }
-
-    public List<Tuple2<Object, M>> getOutgoingMessages() {
-        return this.outgoing;
-    }
-
-    public Iterator<Tuple2<Object, M>> detachOutgoingMessages() {
-        final Iterator<Tuple2<Object, M>> messages = this.outgoing.iterator();
-        this.outgoing = new ArrayList<>();
-        return messages;
-    }
-
-    /*public Iterator<M> detachIncomingMessages() {
-        final Iterator<M> messages = this.incoming.iterator();
-        this.incoming = new ArrayList<>();
-        return messages;
-    }*/
-
-    ///////////
-
-    @Override
-    public Iterator<M> receiveMessages(final MessageScope messageScope) {
-        return this.incoming.iterator();
-    }
-
-    @Override
-    public void sendMessage(final MessageScope messageScope, final M message) {
-        if (messageScope instanceof MessageScope.Local) {
-            final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
-            final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkVertexPayload.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertexWritable.get());
-            final Direction direction = SparkVertexPayload.getOppositeDirection(incidentTraversal);
-            incidentTraversal.forEachRemaining(edge -> this.outgoing.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
-        } else {
-            ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoing.add(new Tuple2<>(v.id(), message)));
-        }
-    }
-
-    ///////////
-
-    private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
-        incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
-        return (T) incidentTraversal;
-    }
-
-    private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
-        final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
-        return step.getDirection().opposite();
-    }
-}