You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/04/15 19:47:54 UTC

[32/50] incubator-tinkerpop git commit: GraphComputer Messenger no longer requires the MessageScope on receiveMessages(). If developers want that type of tagging, that would be built into their message object.

GraphComputer Messenger no longer requires the MessageScope on receiveMessages(). If developers want that type of tagging, that would be built into their message object.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/daec1c3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/daec1c3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/daec1c3b

Branch: refs/heads/variables
Commit: daec1c3b5b6d5ce2404eb7962042fcd6e62e47ef
Parents: 8ab0a23
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Apr 14 14:46:38 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Apr 14 14:46:38 2015 -0600

----------------------------------------------------------------------
 docs/src/the-graphcomputer.asciidoc             |  7 ++-
 .../gremlin/process/computer/MessageScope.java  | 31 ++++++++++++--
 .../gremlin/process/computer/Messenger.java     |  3 +-
 .../peerpressure/PeerPressureVertexProgram.java |  4 +-
 .../ranking/pagerank/PageRankVertexProgram.java |  4 +-
 .../computer/traversal/SingleMessenger.java     |  2 +-
 .../computer/traversal/TraverserExecutor.java   |  2 +-
 .../computer/giraph/GiraphMessenger.java        |  4 +-
 .../process/computer/spark/SparkMessenger.java  |  2 +-
 .../process/computer/TinkerGraphComputer.java   |  2 +-
 .../process/computer/TinkerMessageBoard.java    |  7 +++
 .../process/computer/TinkerMessenger.java       | 45 +++++++++++---------
 12 files changed, 74 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/docs/src/the-graphcomputer.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/the-graphcomputer.asciidoc b/docs/src/the-graphcomputer.asciidoc
index 964fabf..d54b8f2 100644
--- a/docs/src/the-graphcomputer.asciidoc
+++ b/docs/src/the-graphcomputer.asciidoc
@@ -85,7 +85,6 @@ result.memory().get('clusterPopulation')
 g = result.graph().traversal(standard())
 g.V().values(PeerPressureVertexProgram.CLUSTER).groupCount().next()
 g.V().valueMap()
-g.V().valueMap(true,PeerPressureVertexProgram.CLUSTER)
 ----
 
 If there are numerous statistics desired, then its possible to register as many MapReduce jobs as needed. For instance, the `ClusterCountMapReduce` determines how many unique clusters were created by the peer pressure algorithm. Below both `ClusterCountMapReduce` and `ClusterPopulationMapReduce` are computed over the resultant graph.
@@ -189,12 +188,12 @@ public class PageRankVertexProgram implements VertexProgram<Double> { <1>
             messenger.sendMessage(this.countMessageScope, 1.0d);
         } else if (1 == memory.getIteration()) {  <8>
             double initialPageRank = 1.0d / this.vertexCountAsDouble;
-            double edgeCount = IteratorUtils.reduce(messenger.receiveMessages(this.countMessageScope), 0.0d, (a, b) -> a + b);
+            double edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b);
             vertex.property(PAGE_RANK, initialPageRank);
             vertex.property(EDGE_COUNT, edgeCount);
             messenger.sendMessage(this.incidentMessageScope, initialPageRank / edgeCount);
         } else { <9>
-            double newPageRank = IteratorUtils.reduce(messenger.receiveMessages(this.incidentMessageScope), 0.0d, (a, b) -> a + b);
+            double newPageRank = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b);
             newPageRank = (this.alpha * newPageRank) + ((1.0d - this.alpha) / this.vertexCountAsDouble);
             vertex.property(PAGE_RANK, newPageRank);
             messenger.sendMessage(this.incidentMessageScope, newPageRank / vertex.<Double>value(EDGE_COUNT));
@@ -293,5 +292,5 @@ image::gremlin-without-a-cause.png[width=200,float=right]
  . Traversal sideEffects are represented as a distributed data structure across the graph's vertex set. It is not possible to get a global view of a sideEffect until it is aggregated via a <<mapreduce,MapReduce>> job. In some situations, the local vertex representation of the sideEffect is sufficient to ensure the intended semantics of the traversal are respected. However, this is not generally true so be wary of traversals that require global views of a sideEffect.
  . When evaluating traversals that rely on path information (i.e. the history of the traversal), practical computational limits can easily be reached due the link:http://en.wikipedia.org/wiki/Combinatorial_explosion[combinatoric explosion] of data. With path computing enabled, every traverser is unique and thus, must be enumerated as opposed to being counted/merged. The difference being a collection of paths vs. a single 64-bit long at a single vertex. For more information on this concept, please see link:http://thinkaurelius.com/2012/11/11/faunus-provides-big-graph-data-analytics/[Faunus Provides Big Graph Data].
  . When traversals of the form `x.as('a').y.someSideEffectStep('a').z` are evaluated, the `a` object is stored in the path information of the traverser and thus, such traversals (may) turn on path calculations when executed on a GraphComputer.
- . Steps that are concerned with the global ordering of traversers do not have a meaningful representation in OLAP. For example, what does <<order-step,`order()`>>-step mean when all traversers are being processed in parallel? Even if the traversers were aggregated and ordered, then at the next step they would return to being executed in parallel and thus, in an unpredictable order. Other steps of this nature include <<order-step,`order()`>>. When these steps are executed at the end of a traversal (i.e the final step), the `TraverserMapReduce` job ensures the resultant serial representation is ordered accordingly.
+ . Steps that are concerned with the global ordering of traversers do not have a meaningful representation in OLAP. For example, what does <<order-step,`order()`>>-step mean when all traversers are being processed in parallel? Even if the traversers were aggregated and ordered, then at the next step they would return to being executed in parallel and thus, in an unpredictable order. When `order()`-like steps are executed at the end of a traversal (i.e the final step), the `TraverserMapReduce` job ensures the resultant serial representation is ordered accordingly.
  . Steps that are concerned with providing a global aggregate to the next step of computation do not have a correlate in OLAP. For example, <<fold-step,`fold()`>>-step can only fold up the objects at each executing vertex. Next, even if a global fold was possible, where would it go? Which vertex would be the host of the data structure? The `fold()`-step only makes sense as an end-step whereby a MapReduce job can generate the proper global-to-local data reduction.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
index f72f4ee..1304703 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
@@ -18,8 +18,8 @@
  */
 package org.apache.tinkerpop.gremlin.process.computer;
 
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
@@ -67,6 +67,16 @@ public abstract class MessageScope {
         public static Global instance() {
             return INSTANCE;
         }
+
+        @Override
+        public int hashCode() {
+            return 4676576;
+        }
+
+        @Override
+        public boolean equals(final Object other) {
+            return other instanceof Global;
+        }
     }
 
     /**
@@ -82,19 +92,20 @@ public abstract class MessageScope {
     public final static class Local<M> extends MessageScope {
         public final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal;
         public final BiFunction<M, Edge, M> edgeFunction;
+        private final String toStringOfTraversal;
 
         private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
-            this.incidentTraversal = incidentTraversal;
-            this.edgeFunction = (final M m, final Edge e) -> m; // the default is an identity function
+            this(incidentTraversal, (final M m, final Edge e) -> m); // the default is an identity function
         }
 
         private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
             this.incidentTraversal = incidentTraversal;
+            this.toStringOfTraversal = this.incidentTraversal.get().toString();
             this.edgeFunction = edgeFunction;
         }
 
         public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
-            return new Local(incidentTraversal);
+            return new Local<>(incidentTraversal);
         }
 
         public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
@@ -109,6 +120,18 @@ public abstract class MessageScope {
             return this.incidentTraversal;
         }
 
+        @Override
+        public int hashCode() {
+            return this.edgeFunction.hashCode() + this.incidentTraversal.get().toString().hashCode();
+        }
+
+        @Override
+        public boolean equals(final Object other) {
+            return other instanceof Local &&
+                    ((Local<?>) other).toStringOfTraversal.equals(this.toStringOfTraversal) &&
+                    ((Local<?>) other).edgeFunction == this.edgeFunction;
+        }
+
         /**
          * A helper class that can be used to generate the reverse traversal of the traversal within a {@link MessageScope.Local}.
          */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
index 3f04069..a6746ba 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
@@ -34,10 +34,9 @@ public interface Messenger<M> {
     /**
      * The currently executing vertex can receive the messages of the provided {@link MessageScope}.
      *
-     * @param messageScope the message scope of the messages to receive
      * @return the messages for that vertex
      */
-    public Iterator<M> receiveMessages(final MessageScope messageScope);
+    public Iterator<M> receiveMessages();
 
     /**
      * The currently executing vertex can send a message with provided {@link MessageScope}.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
index b1b07ed..0a64cf1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
@@ -136,7 +136,7 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
                 memory.and(VOTE_TO_HALT, false);
             }
         } else if (1 == memory.getIteration() && this.distributeVote) {
-            double voteStrength = 1.0d / IteratorUtils.reduce(IteratorUtils.map(messenger.receiveMessages(this.countScope), Pair::getValue1), 0.0d, (a, b) -> a + b);
+            double voteStrength = 1.0d / IteratorUtils.reduce(IteratorUtils.map(messenger.receiveMessages(), Pair::getValue1), 0.0d, (a, b) -> a + b);
             vertex.property(CLUSTER, vertex.id());
             vertex.property(VOTE_STRENGTH, voteStrength);
             messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
@@ -144,7 +144,7 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
         } else {
             final Map<Serializable, Double> votes = new HashMap<>();
             votes.put(vertex.value(CLUSTER), vertex.<Double>value(VOTE_STRENGTH));
-            messenger.receiveMessages(this.voteScope).forEachRemaining(message -> MapHelper.incr(votes, message.getValue0(), message.getValue1()));
+            messenger.receiveMessages().forEachRemaining(message -> MapHelper.incr(votes, message.getValue0(), message.getValue1()));
             Serializable cluster = PeerPressureVertexProgram.largestCount(votes);
             if (null == cluster) cluster = (Serializable) vertex.id();
             memory.and(VOTE_TO_HALT, vertex.value(CLUSTER).equals(cluster));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
index 448c560..97529bf 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
@@ -128,12 +128,12 @@ public class PageRankVertexProgram extends StaticVertexProgram<Double> {
             messenger.sendMessage(this.countMessageScope, 1.0d);
         } else if (1 == memory.getIteration()) {
             double initialPageRank = 1.0d / this.vertexCountAsDouble;
-            double edgeCount = IteratorUtils.reduce(messenger.receiveMessages(this.countMessageScope), 0.0d, (a, b) -> a + b);
+            double edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b);
             vertex.property(PAGE_RANK, initialPageRank);
             vertex.property(EDGE_COUNT, edgeCount);
             messenger.sendMessage(this.incidentMessageScope, initialPageRank / edgeCount);
         } else {
-            double newPageRank = IteratorUtils.reduce(messenger.receiveMessages(this.incidentMessageScope), 0.0d, (a, b) -> a + b);
+            double newPageRank = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b);
             newPageRank = (this.alpha * newPageRank) + ((1.0d - this.alpha) / this.vertexCountAsDouble);
             vertex.property(PAGE_RANK, newPageRank);
             messenger.sendMessage(this.incidentMessageScope, newPageRank / vertex.<Double>value(EDGE_COUNT));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
index 953e337..26ed8a4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
@@ -38,7 +38,7 @@ public final class SingleMessenger<M> implements Messenger<M> {
     }
 
     @Override
-    public Iterator<M> receiveMessages(final MessageScope messageScope) {
+    public Iterator<M> receiveMessages() {
         return IteratorUtils.of(this.message);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 79175b1..f030dfe 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -47,7 +47,7 @@ public final class TraverserExecutor {
         final TraverserSet<Object> aliveTraversers = new TraverserSet<>();
         // gather incoming traversers into a traverser set and gain the 'weighted-set' optimization
         final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
-        messenger.receiveMessages(MessageScope.Global.instance()).forEachRemaining(traverserSet -> {
+        messenger.receiveMessages().forEachRemaining(traverserSet -> {
             traverserSet.forEach(traverser -> {
                 traverser.setSideEffects(traversalSideEffects);
                 traverser.attach(vertex);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
index dbf5fa4..0041005 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphMessenger.java
@@ -19,9 +19,9 @@
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph;
 
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -46,7 +46,7 @@ public class GiraphMessenger<M> implements Messenger<M> {
     }
 
     @Override
-    public Iterator<M> receiveMessages(final MessageScope messageScope) {
+    public Iterator<M> receiveMessages() {
         return IteratorUtils.map(this.messages, ObjectWritable::get);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index 01153a1..f52843b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -53,7 +53,7 @@ public final class SparkMessenger<M> implements Messenger<M> {
     }
 
     @Override
-    public Iterator<M> receiveMessages(final MessageScope messageScope) {
+    public Iterator<M> receiveMessages() {
         return this.incomingMessages.iterator();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 54f4d82..307722e 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -140,7 +140,7 @@ public class TinkerGraphComputer implements GraphComputer {
                             while (true) {
                                 final Vertex vertex = vertices.next();
                                 if (null == vertex) return;
-                                vertexProgram.execute(vertex, new TinkerMessenger(vertex, this.messageBoard, vertexProgram.getMessageCombiner()), this.memory);
+                                vertexProgram.execute(vertex, new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()), this.memory);
                             }
                         });
                         workers.vertexProgramWorkerIterationEnd(this.memory.asImmutable());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java
index 9ba4c38..c5ad072 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessageBoard.java
@@ -18,10 +18,13 @@
  */
 package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -31,9 +34,13 @@ class TinkerMessageBoard<M> {
 
     public Map<Vertex, Queue<M>> sendMessages = new ConcurrentHashMap<>();
     public Map<Vertex, Queue<M>> receiveMessages = new ConcurrentHashMap<>();
+    public Set<MessageScope> previousMessageScopes = new HashSet<>();
+    public Set<MessageScope> currentMessageScopes = new HashSet<>();
 
     public void completeIteration() {
         this.receiveMessages = this.sendMessages;
         this.sendMessages = new ConcurrentHashMap<>();
+        this.previousMessageScopes = this.currentMessageScopes;
+        this.currentMessageScopes = new HashSet<>();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/daec1c3b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
index ca7f89e..673345b 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
@@ -30,10 +30,13 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.StreamFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.MultiIterator;
 
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.stream.Stream;
 
@@ -46,7 +49,6 @@ public class TinkerMessenger<M> implements Messenger<M> {
     private final TinkerMessageBoard<M> messageBoard;
     private final MessageCombiner<M> combiner;
 
-
     public TinkerMessenger(final Vertex vertex, final TinkerMessageBoard<M> messageBoard, final Optional<MessageCombiner<M>> combiner) {
         this.vertex = vertex;
         this.messageBoard = messageBoard;
@@ -54,30 +56,35 @@ public class TinkerMessenger<M> implements Messenger<M> {
     }
 
     @Override
-    public Iterator<M> receiveMessages(final MessageScope messageScope) {
-        if (messageScope instanceof MessageScope.Local) {
-            final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
-            final Traversal.Admin<Vertex, Edge> incidentTraversal = TinkerMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(), this.vertex);
-            final Direction direction = TinkerMessenger.getDirection(incidentTraversal);
-            final Edge[] edge = new Edge[1]; // simulates storage side-effects available in Gremlin, but not Java8 streams
-            return StreamFactory.stream(VertexProgramHelper.reverse(incidentTraversal.asAdmin()))
-                    .map(e -> this.messageBoard.receiveMessages.get((edge[0] = e).vertices(direction).next()))
-                    .filter(q -> null != q)
-                    .flatMap(q -> q.stream())
-                    .map(message -> localMessageScope.getEdgeFunction().apply(message, edge[0]))
-                    .iterator();
+    public Iterator<M> receiveMessages() {
+        final MultiIterator<M> multiIterator = new MultiIterator<>();
+        for (final MessageScope messageScope : this.messageBoard.previousMessageScopes) {
+            if (messageScope instanceof MessageScope.Local) {
+                final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
+                final Traversal.Admin<Vertex, Edge> incidentTraversal = TinkerMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(), this.vertex);
+                final Direction direction = TinkerMessenger.getDirection(incidentTraversal);
+                final Edge[] edge = new Edge[1]; // simulates storage side-effects available in Gremlin, but not Java8 streams
+                multiIterator.addIterator(StreamFactory.stream(VertexProgramHelper.reverse(incidentTraversal.asAdmin()))
+                        .map(e -> this.messageBoard.receiveMessages.get((edge[0] = e).vertices(direction).next()))
+                        .filter(q -> null != q)
+                        .flatMap(q -> q.stream())
+                        .map(message -> localMessageScope.getEdgeFunction().apply(message, edge[0]))
+                        .iterator());
 
-        } else {
-            return Stream.of(this.vertex)
-                    .map(this.messageBoard.receiveMessages::get)
-                    .filter(q -> null != q)
-                    .flatMap(q -> q.stream())
-                    .iterator();
+            } else {
+                multiIterator.addIterator(Stream.of(this.vertex)
+                        .map(this.messageBoard.receiveMessages::get)
+                        .filter(q -> null != q)
+                        .flatMap(q -> q.stream())
+                        .iterator());
+            }
         }
+        return multiIterator;
     }
 
     @Override
     public void sendMessage(final MessageScope messageScope, final M message) {
+        this.messageBoard.currentMessageScopes.add(messageScope);
         if (messageScope instanceof MessageScope.Local) {
             addMessage(this.vertex, message);
         } else {