You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/03/27 12:13:30 UTC

[06/13] incubator-tinkerpop git commit: so deep in a Spark pit its unreal.

so deep in a Spark pit its unreal.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ff160d8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ff160d8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ff160d8a

Branch: refs/heads/variables
Commit: ff160d8a5d550140b4bbd42a156dbaf1514b2a91
Parents: aded974
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Mar 26 14:47:10 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Mar 26 14:47:23 2015 -0600

----------------------------------------------------------------------
 .../gremlin/util/iterator/IteratorUtils.java    |  33 ++++
 .../process/computer/spark/SparkExecutor.java   | 196 +++++++++++++++++++
 .../computer/spark/SparkGraphComputer.java      |  15 +-
 .../process/computer/spark/SparkPayload.java    |  14 +-
 .../computer/spark/SparkVertexPayload.java      |  19 +-
 .../computer/spark/util/SparkHelper.java        | 196 -------------------
 6 files changed, 262 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
index ef9bd72..0ef038f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.FastNoSuchElementException
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -234,6 +235,38 @@ public final class IteratorUtils {
 
     ///////////////////
 
+    public static final <S, E> Iterator<E> flatMap(final Iterator<S> iterator, final Function<S, Iterator<E>> function) {
+        return new Iterator<E>() {
+
+            private Iterator<E> currentIterator = Collections.emptyIterator();
+
+            @Override
+            public boolean hasNext() {
+                if (this.currentIterator.hasNext())
+                    return true;
+                else {
+                    while (iterator.hasNext()) {
+                        this.currentIterator = function.apply(iterator.next());
+                        if (this.currentIterator.hasNext())
+                            return true;
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public E next() {
+                if (this.hasNext())
+                    return this.currentIterator.next();
+                else
+                    throw FastNoSuchElementException.instance();
+            }
+        };
+    }
+
+
+    ///////////////////
+
     public static final <S> Iterator<S> concat(final Iterator<S>... iterators) {
         final MultiIterator<S> iterator = new MultiIterator<>();
         for (final Iterator<S> itty : iterators) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
new file mode 100644
index 0000000..d9a2537
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMessagePayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkPayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkReduceEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkVertexPayload;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkExecutor {
+
+    private SparkExecutor() {
+    }
+
+    public static <M> JavaPairRDD<Object, SparkPayload<M>> executeStep(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+        JavaPairRDD<Object, SparkPayload<M>> current = graphRDD;
+        // execute vertex program
+        current = current.mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
+            final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
+            final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
+            workerVertexProgram.workerIterationStart(memory);
+            final List<Tuple2<Object, SparkPayload<M>>> emission = new ArrayList<>();
+            partitionIterator.forEachRemaining(keyValue -> {
+                keyValue._2().asVertexPayload().getOutgoingMessages().clear();
+                workerVertexProgram.execute(ComputerGraph.of(keyValue._2().asVertexPayload().getVertex(), elementComputeKeys), keyValue._2().asVertexPayload(), memory);
+                emission.add(keyValue);
+            });
+            workerVertexProgram.workerIterationEnd(memory);
+            return emission;
+        });
+
+        // emit messages by appending them to the graph as message payloads
+        current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> {
+            keyValue._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
+            final List<Tuple2<Object, SparkPayload<M>>> list = new ArrayList<>();
+            list.add(keyValue);    // this is a vertex
+            keyValue._2().asVertexPayload().getOutgoingMessages().forEach(message -> list.add(new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2())))); // this is a message
+            return list;
+        });
+
+        // "message pass" by merging the message payloads with the vertex payloads
+        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
+        current = current.reduceByKey((payloadA, payloadB) -> {
+            if (payloadA.isVertex()) {
+                final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadA.asVertexPayload().getVertex());
+                vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
+                vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
+                return vertexPayload;
+            } else if (payloadB.isVertex()) {
+                final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadB.asVertexPayload().getVertex());
+                vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
+                vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
+                return vertexPayload;
+            } else {
+                final SparkMessagePayload<M> messagePayload = new SparkMessagePayload<>();
+                messagePayload.addMessages(payloadA.getMessages(), messageCombiner);
+                messagePayload.addMessages(payloadB.getMessages(), messageCombiner);
+                return messagePayload;
+            }
+        });
+
+        // clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
+        current = current.mapValues(vertexPayload -> {
+            vertexPayload.asVertexPayload().getOutgoingMessages().clear();
+            return vertexPayload;
+        });
+
+        return current;
+    }
+
+    public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
+            final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
+            workerMapReduce.workerStart(MapReduce.Stage.MAP);
+            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().getVertex(), mapEmitter));
+            workerMapReduce.workerEnd(MapReduce.Stage.MAP);
+            return mapEmitter.getEmissions();
+        });
+        if (mapReduce.getMapKeySort().isPresent())
+            mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
+        return mapRDD;
+    }
+
+    // TODO: public static executeCombine()  is this necessary?
+
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
+            final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
+            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
+            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
+            workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
+            return reduceEmitter.getEmissions();
+        });
+        if (mapReduce.getReduceKeySort().isPresent())
+            reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
+        return reduceRDD;
+    }
+
+    public static void deleteOutputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+        if (null != outputLocation) {
+            try {
+                FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+    }
+
+    public static String getInputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        try {
+            return FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public static <M> void saveGraphRDD(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a <nullwritable,vertexwritable> stream for output
+            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable(tuple._2().asVertexPayload().getVertex())))
+                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
+                            NullWritable.class,
+                            VertexWritable.class,
+                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
+        }
+    }
+
+    public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a Hadoop stream for output
+            mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+                    ObjectWritable.class,
+                    ObjectWritable.class,
+                    (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
+            // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
+            // to get results you have to look through HDFS directory structure. Oh the horror.
+            try {
+                if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
+                    mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+                else
+                    HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 514a811..f43727f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -29,7 +29,6 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util.SparkHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -147,14 +146,14 @@ public final class SparkGraphComputer implements GraphComputer {
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
                     final long startTime = System.currentTimeMillis();
                     SparkMemory memory = null;
-                    SparkHelper.deleteOutputLocation(hadoopConfiguration);
+                    SparkExecutor.deleteOutputLocation(hadoopConfiguration);
 
                     // wire up a spark context
                     final SparkConf sparkConfiguration = new SparkConf();
                     sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
                     hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
                     if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
-                        hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, SparkHelper.getInputLocation(hadoopConfiguration)); // necessary for Spark and newAPIHadoopRDD
+                        hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, SparkExecutor.getInputLocation(hadoopConfiguration)); // necessary for Spark and newAPIHadoopRDD
                     // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
                     try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
                         // add the project jars to the cluster
@@ -183,7 +182,7 @@ public final class SparkGraphComputer implements GraphComputer {
                             // execute the vertex program
                             while (true) {
                                 memory.setInTask(true);
-                                graphRDD = SparkHelper.executeStep(graphRDD, memory, vertexProgramConfiguration);
+                                graphRDD = SparkExecutor.executeStep(graphRDD, memory, vertexProgramConfiguration);
                                 graphRDD.foreachPartition(iterator -> doNothing()); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
                                 memory.setInTask(false);
                                 if (this.vertexProgram.terminate(memory))
@@ -195,7 +194,7 @@ public final class SparkGraphComputer implements GraphComputer {
                             }
                             // write the output graph back to disk
                             if (!this.persist.get().equals(Persist.NOTHING))
-                                SparkHelper.saveGraphRDD(graphRDD, hadoopConfiguration);
+                                SparkExecutor.saveGraphRDD(graphRDD, hadoopConfiguration);
                         }
 
                         final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
@@ -215,12 +214,12 @@ public final class SparkGraphComputer implements GraphComputer {
                                 final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                                 mapReduce.storeState(newApacheConfiguration);
                                 // map
-                                final JavaPairRDD mapRDD = SparkHelper.executeMap((JavaPairRDD) graphRDD, mapReduce, newApacheConfiguration);
+                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) graphRDD, mapReduce, newApacheConfiguration);
                                 // combine TODO? is this really needed
                                 // reduce
-                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
                                 // write the map reduce output back to disk (memory)
-                                SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
+                                SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                             }
                         }
                         // close the context or else bad things happen // TODO: does this happen automatically cause of the try(resource) {} block?

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
index 11f98bb..ebb4b18 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
@@ -21,8 +21,6 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 
 import java.util.List;
-import java.util.Optional;
-import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -31,11 +29,15 @@ public interface SparkPayload<M> {
 
     public default void addMessages(final List<M> otherMessages, final MessageCombiner<M> messageCombiner) {
         if (null != messageCombiner) {
-            final Optional<M> combinedMessage = Stream.concat(this.getMessages().stream(), otherMessages.stream()).reduce(messageCombiner::combine);
-            if (combinedMessage.isPresent()) {
-                this.getMessages().clear();
-                this.getMessages().add(combinedMessage.get());
+            M message = null;
+            for (final M m : this.getMessages()) {
+                message = null == message ? m : messageCombiner.combine(message, m);
             }
+            for (final M m : otherMessages) {
+                message = null == message ? m : messageCombiner.combine(message, m);
+            }
+            this.getMessages().clear();
+            if (null != message) this.getMessages().add(message);
         } else {
             this.getMessages().addAll(otherMessages);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
index 30563d6..2ac67e6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
@@ -32,6 +32,7 @@ import scala.Tuple2;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -41,7 +42,7 @@ public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M
 
     private final VertexWritable vertexWritable;
     private final List<M> incoming;
-    private final List<Tuple2<Object, M>> outgoing;
+    private List<Tuple2<Object, M>> outgoing;
 
     public SparkVertexPayload(final Vertex vertex) {
         this.vertexWritable = new VertexWritable(vertex);
@@ -68,10 +69,26 @@ public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M
         return this.vertexWritable.get();
     }
 
+    public VertexWritable getVertexWritable() {
+        return this.vertexWritable;
+    }
+
     public List<Tuple2<Object, M>> getOutgoingMessages() {
         return this.outgoing;
     }
 
+    public Iterator<Tuple2<Object, M>> detachOutgoingMessages() {
+        final Iterator<Tuple2<Object, M>> messages = this.outgoing.iterator();
+        this.outgoing = new ArrayList<>();
+        return messages;
+    }
+
+    /*public Iterator<M> detachIncomingMessages() {
+        final Iterator<M> messages = this.incoming.iterator();
+        this.incoming = new ArrayList<>();
+        return messages;
+    }*/
+
     ///////////
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
deleted file mode 100644
index 7696f3a..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMessagePayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkReduceEmitter;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkVertexPayload;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkHelper {
-
-    private SparkHelper() {
-    }
-
-    public static <M> JavaPairRDD<Object, SparkPayload<M>> executeStep(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
-        JavaPairRDD<Object, SparkPayload<M>> current = graphRDD;
-        // execute vertex program
-        current = current.mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
-            final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
-            final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
-            workerVertexProgram.workerIterationStart(memory);
-            final List<Tuple2<Object, SparkPayload<M>>> emission = new ArrayList<>();
-            partitionIterator.forEachRemaining(keyValue -> {
-                keyValue._2().asVertexPayload().getOutgoingMessages().clear();
-                workerVertexProgram.execute(ComputerGraph.of(keyValue._2().asVertexPayload().getVertex(), elementComputeKeys), keyValue._2().asVertexPayload(), memory);
-                emission.add(keyValue);
-            });
-            workerVertexProgram.workerIterationEnd(memory);
-            return emission;
-        });
-
-        // emit messages by appending them to the graph as message payloads
-        current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> {
-            keyValue._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
-            final List<Tuple2<Object, SparkPayload<M>>> list = new ArrayList<>();
-            list.add(keyValue);    // this is a vertex
-            keyValue._2().asVertexPayload().getOutgoingMessages().forEach(message -> list.add(new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2())))); // this is a message
-            return list;
-        });
-
-        // "message pass" by merging the message payloads with the vertex payloads
-        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
-        current = current.reduceByKey((payloadA, payloadB) -> {
-            if (payloadA.isVertex()) {
-                final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadA.asVertexPayload().getVertex());
-                vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
-                vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
-                return vertexPayload;
-            } else if (payloadB.isVertex()) {
-                final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadB.asVertexPayload().getVertex());
-                vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
-                vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
-                return vertexPayload;
-            } else {
-                final SparkMessagePayload<M> messagePayload = new SparkMessagePayload<>();
-                messagePayload.addMessages(payloadA.getMessages(), messageCombiner);
-                messagePayload.addMessages(payloadB.getMessages(), messageCombiner);
-                return messagePayload;
-            }
-        });
-
-        // clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
-        current = current.mapValues(vertexPayload -> {
-            vertexPayload.asVertexPayload().getOutgoingMessages().clear();
-            return vertexPayload;
-        });
-
-        return current;
-    }
-
-    public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
-            final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
-            workerMapReduce.workerStart(MapReduce.Stage.MAP);
-            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
-            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().getVertex(), mapEmitter));
-            workerMapReduce.workerEnd(MapReduce.Stage.MAP);
-            return mapEmitter.getEmissions();
-        });
-        if (mapReduce.getMapKeySort().isPresent())
-            mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
-        return mapRDD;
-    }
-
-    // TODO: public static executeCombine()  is this necessary?
-
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
-            final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
-            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
-            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
-            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
-            workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
-            return reduceEmitter.getEmissions();
-        });
-        if (mapReduce.getReduceKeySort().isPresent())
-            reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
-        return reduceRDD;
-    }
-
-    public static void deleteOutputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
-        if (null != outputLocation) {
-            try {
-                FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-    }
-
-    public static String getInputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        try {
-            return FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    public static <M> void saveGraphRDD(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-        if (null != outputLocation) {
-            // map back to a <nullwritable,vertexwritable> stream for output
-            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable(tuple._2().asVertexPayload().getVertex())))
-                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
-                            NullWritable.class,
-                            VertexWritable.class,
-                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
-        }
-    }
-
-    public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-        if (null != outputLocation) {
-            // map back to a Hadoop stream for output
-            mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
-                    ObjectWritable.class,
-                    ObjectWritable.class,
-                    (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
-            // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
-            // to get results you have to look through HDFS directory structure. Oh the horror.
-            try {
-                if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
-                    mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
-                else
-                    HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-    }
-}