You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/31 23:21:50 UTC

incubator-tinkerpop git commit: added new SparkGraphComputer message passing algorithm that caches the graph structure and has dynamic views and messages.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 1d3e1cc7e -> 63959c4ac


added new SparkGraphComputer message passing algorithm that caches the graph structure and has dynamic views and messages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/63959c4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/63959c4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/63959c4a

Branch: refs/heads/master
Commit: 63959c4acfefd68dec42ce290f843d30a690df35
Parents: 1d3e1cc
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 31 15:21:29 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 31 15:21:46 2015 -0600

----------------------------------------------------------------------
 .../util/detached/DetachedVertexProperty.java   |   6 +
 .../process/computer/GraphComputerTest.java     |   2 +-
 .../process/computer/spark/SparkExecutor.java   | 166 ++++++++-----------
 .../computer/spark/SparkGraphComputer.java      |  38 +++--
 .../process/computer/spark/SparkMessenger.java  |   6 +-
 5 files changed, 106 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java
index fd8fd9f..253b647 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java
@@ -126,4 +126,10 @@ public class DetachedVertexProperty<V> extends DetachedElement<Property<V>> impl
     public <U> Iterator<Property<U>> properties(final String... propertyKeys) {
         return (Iterator) super.properties(propertyKeys);
     }
+
+    public static <V> VertexProperty<V> addTo(final Vertex vertex, final DetachedVertexProperty<V> detachedVertexProperty) {
+        final VertexProperty<V> vertexProperty = vertex.property(detachedVertexProperty.key(), detachedVertexProperty.value());
+        detachedVertexProperty.properties().forEachRemaining(property -> vertexProperty.property(property.key(), property.value()));
+        return vertexProperty;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 96ab10d..e2708b1 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -597,7 +597,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
         @Override
         public Set<String> getElementComputeKeys() {
-            return null;
+            return Collections.emptySet();
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 03de47a..fbbc9d0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
-import com.google.common.base.Optional;
 import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,7 +36,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -58,121 +56,93 @@ public final class SparkExecutor {
     private SparkExecutor() {
     }
 
-    public static <M> JavaPairRDD<Object, SparkVertexPayload<M>> executeVertexProgramIteration(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+    public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessageRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+
         // execute vertex program iteration
-        final JavaPairRDD<Object, SparkVertexPayload<M>> verticesWithOutgoingMessages = graphRDD
-                .mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
+        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndOutgoingMessagesRDD = null == viewAndMessageRDD ?
+                graphRDD.mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
                     final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
+                    final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
                     workerVertexProgram.workerIterationStart(memory);
-                    return () -> IteratorUtils.map(partitionIterator, vertex -> {
-                        vertex._2().getOutgoingMessages().clear(); // there should be no outgoing messages at this point
-                        workerVertexProgram.execute(ComputerGraph.of(vertex._2().getVertex(), elementComputeKeys), vertex._2(), memory);
+                    return () -> IteratorUtils.map(partitionIterator, vertexWritable -> {
+                        final Vertex vertex = vertexWritable._2().get();
+                        final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, Collections.emptyList());
+                        workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory);
+                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages();
+                        final List<DetachedVertexProperty<Object>> newView = new ArrayList<>();
+                        if (null != elementComputeKeysArray)
+                            vertex.properties(elementComputeKeysArray).forEachRemaining(property -> newView.add(DetachedFactory.detach(property, true)));
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory);
-                        vertex._2().getMessages().clear(); // there should be no incoming messages at this point (only outgoing messages)
-                        return vertex;
+                        return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
                     });
-                }).cache();
+                }) :
+                graphRDD.leftOuterJoin(viewAndMessageRDD)
+                        .mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
+                            final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
+                            final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
+                            final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
+                            workerVertexProgram.workerIterationStart(memory);
+                            return () -> IteratorUtils.map(partitionIterator, vertexWritableAndIncomingMessages -> {
+                                final Vertex vertex = vertexWritableAndIncomingMessages._2()._1().get();
+                                final List<M> incomingMessages = vertexWritableAndIncomingMessages._2()._2().isPresent() ? vertexWritableAndIncomingMessages._2()._2().get()._2() : Collections.emptyList();
+                                final List<DetachedVertexProperty<Object>> view = vertexWritableAndIncomingMessages._2()._2().isPresent() ? vertexWritableAndIncomingMessages._2()._2().get()._1() : Collections.emptyList();
+                                view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
+                                final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages);
+                                workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory);
+                                final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages();
+                                final List<DetachedVertexProperty<Object>> newView = new ArrayList<>();
+                                if (null != elementComputeKeysArray)
+                                    vertex.properties(elementComputeKeysArray).forEachRemaining(property -> newView.add(DetachedFactory.detach(property, true)));
+                                if (!partitionIterator.hasNext())
+                                    workerVertexProgram.workerIterationEnd(memory);
+                                return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
+                            });
+                        });
+        viewAndOutgoingMessagesRDD.cache();
 
         // "message pass" by reducing on the vertex object id of the message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
-        final JavaPairRDD<Object, SparkMessagePayload<M>> incomingMessages = verticesWithOutgoingMessages
-                .flatMapToPair(vertexPayload -> () ->
-                        IteratorUtils.map(vertexPayload._2().detachOutgoingMessages(), // this removes all outgoing messages once they have been iterated by this step
-                                message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2()))))
-                .reduceByKey((messagePayloadA, messagePayloadB) -> {
-                    messagePayloadA.addMessages(messagePayloadB.getMessages(), messageCombiner);
-                    return messagePayloadA;
-                });
-
-        // join the incoming messages with the vertices
-        final JavaPairRDD<Object, SparkVertexPayload<M>> verticesWithIncomingMessages = verticesWithOutgoingMessages
-                .leftOuterJoin(incomingMessages)
-                .mapValues(tuple -> {
-                    final SparkVertexPayload<M> vertexPayload = tuple._1();
-                    final SparkMessagePayload<M> messagePayload = tuple._2().orNull();
-                    if (null != messagePayload) {
-                        vertexPayload.getOutgoingMessages().clear();  // there should be no outgoing messages at this point (just to be safe)
-                        vertexPayload.getMessages().clear();          // there should be no incoming messages at this point (just to be safe)
-                        vertexPayload.getMessages().addAll(messagePayload.getMessages());
+        final JavaPairRDD<Object, List<M>> incomingMessages = viewAndOutgoingMessagesRDD
+                .flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2()._2().iterator(), x -> {
+                    final List<M> list = new ArrayList<>();
+                    list.add(x._2());
+                    return new Tuple2<>(x._1(), list);
+                })).reduceByKey((a, b) -> {
+                    if (null == messageCombiner) {
+                        a.addAll(b);
+                        return a;
+                    } else {
+                        final M m = messageCombiner.combine(a.get(0),b.get(0));
+                        a.clear();
+                        b.clear();
+                        a.add(m);
+                        return a;
                     }
-                    return vertexPayload;
                 });
 
+        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> newViewMessageRDD = viewAndOutgoingMessagesRDD
+                .mapValues(Tuple2::_1)
+                .fullOuterJoin(incomingMessages)
+                .mapValues(tuple -> new Tuple2<>(tuple._1().or(Collections.emptyList()), tuple._2().or(Collections.emptyList())));
 
-        verticesWithIncomingMessages.foreachPartition(partitionIterator -> {
+        newViewMessageRDD.foreachPartition(partitionIterator -> {
         }); // need to complete a task so its BSP.
-        return verticesWithIncomingMessages;
+        return newViewMessageRDD;
     }
 
-    //////////////////////////////
-    /////DEMO ALGORITHM /////////
-    /////////////////////////////
-    /////////////////////////////
-
-    public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration2(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewMessageRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
-        final JavaPairRDD<Object, Tuple2<VertexWritable, Optional<Tuple2<List<DetachedVertexProperty<Object>>, List<M>>>>> graphViewMessagesRDD = graphRDD.leftOuterJoin(viewMessageRDD);
+    /////////////////
+    // MAP REDUCE //
+    ////////////////
 
-        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndMessagesRDD = graphViewMessagesRDD.mapPartitionsToPair(partitions -> {
-            final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
-            workerVertexProgram.workerIterationStart(memory);
-            final SparkMessenger<M> messenger = new SparkMessenger<>();
-            final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
-            final String[] elementComputeKeysArray = elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
-            return () -> IteratorUtils.map(partitions, graphViewMessages -> {
-                final Vertex vertex = graphViewMessages._2()._1().get();
-                final List<DetachedVertexProperty<Object>> view = graphViewMessages._2()._2().isPresent() ? graphViewMessages._2()._2().get()._1() : Collections.emptyList();
-                final List<M> incomingMessages = graphViewMessages._2()._2().isPresent() ? graphViewMessages._2()._2().get()._2() : Collections.emptyList();
-                view.forEach(property -> property.attach(vertex));
-                messenger.setVertexAndMessages(vertex, incomingMessages);
-                memory.setInTask(true);
-                workerVertexProgram.execute(vertex, messenger, memory);
-                memory.setInTask(false);
-                final List<DetachedVertexProperty<Object>> properties = IteratorUtils.list(IteratorUtils.<VertexProperty<Object>, DetachedVertexProperty<Object>>map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
-                vertex.properties(elementComputeKeysArray).forEachRemaining(VertexProperty::remove);
-                if (!partitions.hasNext())
-                    workerVertexProgram.workerIterationEnd(memory);
-                return new Tuple2<>(vertex.id(), new Tuple2<>(properties, messenger.getOutgoingMessages()));
-            });
-        });
-
-        final JavaPairRDD<Object, List<DetachedVertexProperty<Object>>> newViewRDD = viewAndMessagesRDD.mapValues(Tuple2::_1);
-
-        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
-        final JavaPairRDD<Object, List<M>> newMessagesRDD = viewAndMessagesRDD
-                .flatMapToPair(viewAndMessages -> () -> viewAndMessages._2()._2().iterator())
-                .mapValues(message -> {
-                    final List<M> list = new ArrayList<>(1);
-                    list.add(message);
-                    return list;
-                }).reduceByKey((messageA, messageB) -> {
-                    if (null != messageCombiner) {
-                        final M message = Stream.concat(messageA.stream(), messageB.stream()).reduce(messageCombiner::combine).get();
-                        messageA.clear();
-                        messageA.add(message);
-                        return messageA;
-                    } else {
-                        messageA.addAll(messageB);
-                        return messageA;
-                    }
-                });
-
-        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> newViewMessages = newViewRDD.join(newMessagesRDD);
-
-        newViewMessages.foreachPartition(x -> {
-        }); // execute the view
-
-        return newViewMessages;
-    }
-
-    public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
             workerMapReduce.workerStart(MapReduce.Stage.MAP);
             final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
             return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
-                workerMapReduce.map(keyValue._2().getVertex(), mapEmitter);
+                workerMapReduce.map(keyValue._2().get(), mapEmitter);
                 if (!partitionIterator.hasNext())
                     workerMapReduce.workerEnd(MapReduce.Stage.MAP);
                 return mapEmitter.getEmissions();
@@ -202,6 +172,10 @@ public final class SparkExecutor {
         return reduceRDD;
     }
 
+    ///////////////////
+    // Input/Output //
+    //////////////////
+
     public static void deleteOutputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
         final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
         if (null != outputLocation) {
@@ -221,11 +195,11 @@ public final class SparkExecutor {
         }
     }
 
-    public static <M> void saveGraphRDD(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+    public static <M> void saveGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
         final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
         if (null != outputLocation) {
             // map back to a <nullwritable,vertexwritable> stream for output
-            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2().getVertexWritable()))
+            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
                     .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
                             NullWritable.class,
                             VertexWritable.class,

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index e5388c9..94eb6a6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -44,13 +44,17 @@ import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -159,12 +163,14 @@ public final class SparkGraphComputer implements GraphComputer {
                         // add the project jars to the cluster
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
                         // create a message-passing friendly rdd from the hadoop input format
-                        JavaPairRDD<Object, SparkVertexPayload<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                        final JavaPairRDD<Object, VertexWritable> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
                                 (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                 NullWritable.class,
                                 VertexWritable.class)
-                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(tuple._2().get())))
-                                .reduceByKey((a, b) -> a); // partition the graph across the cluster
+                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+                                .reduceByKey((a, b) -> a) // TODO: test without doing this reduce
+                                .cache(); // partition the graph across the cluster
+                        JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Object>>> viewAndMessageRDD = null;
 
                         ////////////////////////////////
                         // process the vertex program //
@@ -182,7 +188,7 @@ public final class SparkGraphComputer implements GraphComputer {
                             // execute the vertex program
                             while (true) {
                                 memory.setInTask(true);
-                                graphRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, memory, vertexProgramConfiguration);
+                                viewAndMessageRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewAndMessageRDD, memory, vertexProgramConfiguration);
                                 memory.setInTask(false);
                                 if (this.vertexProgram.terminate(memory))
                                     break;
@@ -203,18 +209,28 @@ public final class SparkGraphComputer implements GraphComputer {
                         //////////////////////////////
                         if (!this.mapReducers.isEmpty()) {
                             // drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
-                            graphRDD = graphRDD.mapValues(vertex -> {
-                                vertex.getMessages().clear();
-                                vertex.getOutgoingMessages().clear();
-                                vertex.getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
-                                return vertex;
-                            }).cache();
+                            final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = null == viewAndMessageRDD ?
+                                    graphRDD.mapValues(vertexWritable -> {
+                                        vertexWritable.get().edges(Direction.BOTH).forEachRemaining(Edge::remove);
+                                        return vertexWritable;
+                                    })
+                                            .cache() :
+                                    graphRDD.leftOuterJoin(viewAndMessageRDD)
+                                            .mapValues(tuple -> {
+                                                final Vertex vertex = tuple._1().get();
+                                                vertex.edges(Direction.BOTH).forEachRemaining(Edge::remove);
+                                                final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get()._1() : Collections.emptyList();
+                                                view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
+                                                return tuple._1();
+                                            })
+                                            .cache();
+
                             for (final MapReduce mapReduce : this.mapReducers) {
                                 // execute the map reduce job
                                 final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                                 mapReduce.storeState(newApacheConfiguration);
                                 // map
-                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) graphRDD, mapReduce, newApacheConfiguration);
+                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
                                 // combine TODO? is this really needed
                                 // reduce
                                 final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index 4b174e1..49847de 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -39,20 +39,18 @@ import java.util.List;
 public class SparkMessenger<M> implements Messenger<M> {
 
     private Vertex vertex;
-    private Iterable<M> incomingMessages = new ArrayList<>();
+    private Iterable<M> incomingMessages;
     private final List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
 
-    public void setVertexAndMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
+    public SparkMessenger(final Vertex vertex, final Iterable<M> incomingMessages) {
         this.vertex = vertex;
         this.incomingMessages = incomingMessages;
-        this.outgoingMessages.clear();
     }
 
     public List<Tuple2<Object, M>> getOutgoingMessages() {
         return this.outgoingMessages;
     }
 
-
     @Override
     public Iterator<M> receiveMessages(final MessageScope messageScope) {
         return this.incomingMessages.iterator();