You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/08 17:52:41 UTC

tinkerpop git commit: lots of stuff learned. I had a bug in my barrier locking. have a better model now where locks are released at VoteToHalt. Cleaned up code -- reduced complexity. Better message names to line up with process API semantics.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 3063b99b7 -> 8ef0897f9


lots of stuff learned. I had a bug in my barrier locking. have a better model now where locks are released at VoteToHalt. Cleaned up code -- reduced complexity. Better message names to line up with process API semantics.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8ef0897f
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8ef0897f
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8ef0897f

Branch: refs/heads/TINKERPOP-1564
Commit: 8ef0897f9565f5cb64f5e48506ed4c54605a6021
Parents: 3063b99
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Dec 8 10:52:36 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Dec 8 10:52:36 2016 -0700

----------------------------------------------------------------------
 .../process/traversal/util/TraversalMatrix.java |   5 +
 .../akka/DistributedTraversalSideEffects.java   |   4 +-
 .../process/akka/MasterTraversalActor.java      | 109 +++++++++----------
 .../process/akka/TinkerActorSystem.java         |  13 ++-
 .../process/akka/TraverserMailbox.java          |  12 +-
 .../process/akka/WorkerTraversalActor.java      | 104 ++++++++++--------
 .../akka/messages/BarrierAddMessage.java        |  47 ++++++++
 .../akka/messages/BarrierDoneMessage.java       |  40 +++++++
 .../akka/messages/BarrierMergeMessage.java      |  47 --------
 .../messages/BarrierSynchronizationMessage.java |  45 --------
 .../messages/HaltSynchronizationMessage.java    |  36 ------
 .../akka/messages/SideEffectAddMessage.java     |  42 +++++++
 .../akka/messages/SideEffectMergeMessage.java   |  42 -------
 .../process/akka/messages/StartMessage.java     |  32 ++++++
 .../messages/StartSynchronizationMessage.java   |  32 ------
 .../akka/messages/VoteToHaltMessage.java        |  36 ++++++
 16 files changed, 332 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java
index 5b0816d..843a719 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A TraversalMatrix provides random, non-linear access to the steps of a traversal by their step id.
@@ -45,6 +46,10 @@ public final class TraversalMatrix<S, E> {
         return (C) this.matrix.get(stepId);
     }
 
+    public Set<String> getStepIds() {
+        return this.matrix.keySet();
+    }
+
     public Traversal.Admin<S, E> getTraversal() {
         return this.traversal;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
index 8b29a08..61bff7c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
@@ -21,7 +21,7 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 
 import akka.actor.ActorContext;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
 
 import java.util.Optional;
 import java.util.Set;
@@ -73,7 +73,7 @@ public final class DistributedTraversalSideEffects implements TraversalSideEffec
 
     @Override
     public void add(final String key, final Object value) {
-        this.context.parent().tell(new SideEffectMergeMessage(key, value), this.context.self());
+        this.context.parent().tell(new SideEffectAddMessage(key, value), this.context.self());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 94d9c27..20d770a 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -38,17 +38,15 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMergeMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -60,7 +58,8 @@ public final class MasterTraversalActor extends AbstractActor implements Require
     private final TraversalMatrix<?, ?> matrix;
     private final Partitioner partitioner;
     private List<ActorSelection> workers;
-    private final Map<String, Set<ActorPath>> synchronizationLocks = new HashMap<>();
+    private final Set<ActorPath> haltSynchronization = new HashSet<>();
+    private Barrier barrierLock = null;
 
     public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
         System.out.println("master[created]: " + self().path());
@@ -78,44 +77,37 @@ public final class MasterTraversalActor extends AbstractActor implements Require
                 match(Traverser.Admin.class, traverser -> {
                     this.processTraverser(traverser);
                 }).
-                match(BarrierMergeMessage.class, barrier -> {
-                    final Barrier barrierStep = ((Barrier) this.matrix.getStepById(barrier.getStepId()));
-                    barrierStep.addBarrier(barrier.getBarrier());
-                    broadcast(new BarrierSynchronizationMessage(barrierStep, true));
+                match(BarrierAddMessage.class, barrierMerge -> {
+                    final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId());
+                    assert null == this.barrierLock || this.barrierLock == barrier;
+                    this.barrierLock = barrier;
+                    this.barrierLock.addBarrier(barrierMerge.getBarrier());
                 }).
-                match(BarrierSynchronizationMessage.class, barrierSync -> {
-                    Set<ActorPath> locks = this.synchronizationLocks.get(barrierSync.getStepId());
-                    if (null == locks) {
-                        locks = new HashSet<>();
-                        this.synchronizationLocks.put(barrierSync.getStepId(), locks);
-                    }
-                    locks.add(sender().path());
-                    if (locks.size() == this.workers.size()) {
-                        final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(barrierSync.getStepId());
-                        this.broadcast(new BarrierSynchronizationMessage((Barrier) step, false));
-                        while (step.hasNext()) {
-                            this.processTraverser(step.next());
-                        }
-                    }
-                }).
-                match(SideEffectMergeMessage.class, sideEffect -> {
-                    this.traversal.getSideEffects().add(sideEffect.getKey(), sideEffect.getValue());
-                    //this.broadcast(new SideEffectMergeMessage(sideEffect.getKey(), sideEffect.getValue()));
+                match(SideEffectAddMessage.class, sideEffect -> {
+                    this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
+                    //this.broadcast(new SideEffectAddMessage(sideEffect.getKey(), sideEffect.getValue()));
                 }).
-                match(HaltSynchronizationMessage.class, haltSync -> {
-                    Set<ActorPath> locks = this.synchronizationLocks.get(Traverser.Admin.HALT);
-                    if (null == locks) {
-                        locks = new HashSet<>();
-                        this.synchronizationLocks.put(Traverser.Admin.HALT, locks);
-                    }
-                    if (haltSync.isHalt()) {
-                        locks.add(sender().path());
-                        if (locks.size() == this.workers.size())
-                            context().system().terminate();
-                    } else {
-                        locks.remove(sender().path());
-                        this.broadcast(new HaltSynchronizationMessage(true));
-                    }
+                match(VoteToHaltMessage.class, haltSync -> {
+                    // receive vote to halt messages from worker
+                    // when all workers  have voted to halt then terminate the system
+                    if (haltSync.voteToHalt()) {
+                        this.haltSynchronization.add(sender().path());
+                        if (this.haltSynchronization.size() == this.workers.size()) {
+                            if (null != this.barrierLock) {
+                                final Step<?, ?> step = (Step) this.barrierLock;
+                                while (step.hasNext()) {
+                                    this.sendTraverser(step.next());
+                                }
+                                // broadcast to all workers that the barrier is unlocked
+                                this.broadcast(new BarrierDoneMessage(this.barrierLock));
+                                this.barrierLock = null;
+                                this.haltSynchronization.clear();
+                            } else
+                                context().system().terminate();
+                        }
+                    } else
+                        this.haltSynchronization.remove(sender().path());
+
                 }).build());
     }
 
@@ -127,7 +119,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require
             this.workers.add(context().actorSelection(worker.path()));
         }
         for (final ActorSelection worker : this.workers) {
-            worker.tell(StartSynchronizationMessage.instance(), self());
+            worker.tell(StartMessage.instance(), self());
         }
     }
 
@@ -138,20 +130,25 @@ public final class MasterTraversalActor extends AbstractActor implements Require
     }
 
     private void processTraverser(final Traverser.Admin traverser) {
+        if (traverser.isHalted() || traverser.get() instanceof Element) {
+            this.sendTraverser(traverser);
+        } else {
+            final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+            step.addStart(traverser);
+            while (step.hasNext()) {
+                this.processTraverser(step.next());
+            }
+        }
+    }
+
+    private void sendTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted()) {
             System.out.println("master[result]: " + traverser);
-            broadcast(new HaltSynchronizationMessage(true));
+        } else if (traverser.get() instanceof Element) {
+            final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
+            context().actorSelection("worker-" + otherPartition.hashCode()).tell(traverser, self());
         } else {
-            if (traverser.get() instanceof Element) {
-                final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
-                context().actorSelection("worker-" + otherPartition.hashCode()).tell(traverser, self());
-            } else {
-                final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
-                step.addStart(traverser);
-                while (step.hasNext()) {
-                    this.processTraverser(step.next());
-                }
-            }
+            self().tell(traverser, self());
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index b60f267..6f83920 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
 import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -65,19 +66,23 @@ public final class TinkerActorSystem {
                 g.V().repeat(both()).times(2).
                         groupCount("a").by("name").
                         cap("a").
-                        select(Column.keys).unfold().limit(3).asAdmin(),
-                // barries work and beyond the local star graph works
+                        unfold().
+                        order().by(Column.values, Order.decr).limit(3).asAdmin(),
+                // barriers work and beyond the local star graph works
                 g.V().repeat(both()).times(2).hasLabel("person").
                         group().
                         by("name").
-                        by(out("created").values("name").dedup().fold()).asAdmin()
+                        by(out("created").values("name").dedup().fold()).asAdmin(),
+                // no results works
+                g.V().out("blah").asAdmin()
         );
         for (final Traversal.Admin<?, ?> traversal : traversals) {
             System.out.println("EXECUTING: " + traversal.getBytecode());
-            final TinkerActorSystem actors = new TinkerActorSystem(traversal);
+            final TinkerActorSystem actors = new TinkerActorSystem(traversal.clone());
             while (!actors.system.isTerminated()) {
 
             }
+            //System.out.println(traversal.toList());
             System.out.println("//////////////////////////////////\n");
         }
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index e0f84dc..f9343e6 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -28,9 +28,9 @@ import akka.dispatch.ProducesMessageQueue;
 import com.typesafe.config.Config;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
 import scala.Option;
 
 import java.util.Queue;
@@ -57,11 +57,11 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
         public void enqueue(final ActorRef receiver, final Envelope handle) {
             if (handle.message() instanceof Traverser.Admin)
                 this.traverserSet.offer((Traverser.Admin) handle.message());
-            else if (handle.message() instanceof SideEffectMergeMessage)
+            else if (handle.message() instanceof SideEffectAddMessage)
                 this.queue.offer(handle);
-            else if (handle.message() instanceof BarrierSynchronizationMessage)
+            else if (handle.message() instanceof BarrierDoneMessage)
                 this.barrierSyncs.offer(handle);
-            else if (handle.message() instanceof HaltSynchronizationMessage)
+            else if (handle.message() instanceof VoteToHaltMessage)
                 this.haltSyncs.offer(handle);
             else
                 this.queue.offer(handle);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 20475ef..f280678 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -20,6 +20,7 @@
 package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 
 import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.dispatch.RequiresMessageQueue;
 import akka.japi.pf.ReceiveBuilder;
@@ -27,17 +28,16 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMergeMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -48,12 +48,14 @@ import java.util.Map;
 public final class WorkerTraversalActor extends AbstractActor implements
         RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
 
-    private final TraversalMatrix<?, ?> matrix;
+    private TraversalMatrix<?, ?> matrix = null;
     private final Partition localPartition;
     private final Partitioner partitioner;
-    private boolean sentHaltMessage = false;
+    private boolean voteToHalt = false;
     private final Map<String, ActorSelection> workers = new HashMap<>();
 
+    private Barrier barrierLock = null;
+
     public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
         System.out.println("worker[created]: " + self().path());
         this.matrix = new TraversalMatrix<>(traversal);
@@ -63,40 +65,68 @@ public final class WorkerTraversalActor extends AbstractActor implements
         ((GraphStep) traversal.getStartStep()).setIteratorSupplier(localPartition::vertices);
 
         receive(ReceiveBuilder.
-                match(StartSynchronizationMessage.class, start -> {
+                match(StartMessage.class, start -> {
+                    // initial message from master that says: "start processing"
                     final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
                     while (step.hasNext()) {
-                        this.processTraverser(step.next());
+                        this.sendTraverser(step.next());
                     }
+                    // internal vote to have in mailbox as final message to process
+                    self().tell(new VoteToHaltMessage(true), self());
                 }).
-                match(BarrierSynchronizationMessage.class, barrierSync -> {
-                    final Barrier barrier = this.matrix.getStepById(barrierSync.getStepId());
-                    if (barrierSync.getLock()) {
-                        this.processBarrier(barrier);
-                    } else {
-                        barrier.done();
+                match(Traverser.Admin.class, traverser -> {
+                    if (this.voteToHalt) {
+                        // tell master you no longer want to halt
+                        master().tell(new VoteToHaltMessage(false), self());
+                        // internal vote to have in mailbox as final message to process
+                        self().tell(new VoteToHaltMessage(true), self());
+                        this.voteToHalt = false;
                     }
+                    this.processTraverser(traverser);
                 }).
-                match(SideEffectMergeMessage.class, sideEffect -> {
+                match(SideEffectAddMessage.class, sideEffect -> {
                     // TODO: sideEffect.setSideEffect(this.matrix.getTraversal());
                 }).
-                match(HaltSynchronizationMessage.class, haltSync -> {
-                    sender().tell(new HaltSynchronizationMessage(true), self());
-                    this.sentHaltMessage = true;
+                match(BarrierDoneMessage.class, barrierSync -> {
+                    // barrier is complete and processing can continue
+                    if (null != this.barrierLock) {
+                        this.barrierLock.done();
+                        this.barrierLock = null;
+                        // internal vote to have in mailbox as final message to process
+                        self().tell(new VoteToHaltMessage(true), self());
+                    }
                 }).
-                match(Traverser.Admin.class, traverser -> {
-                    if (this.sentHaltMessage) {
-                        context().parent().tell(new HaltSynchronizationMessage(false), self());
-                        this.sentHaltMessage = false;
+                match(VoteToHaltMessage.class, haltSync -> {
+                    // if there is a barrier and thus, halting at barrier, then process barrier
+                    if (null != this.barrierLock) {
+                        while (this.barrierLock.hasNextBarrier()) {
+                            master().tell(new BarrierAddMessage(this.barrierLock), self());
+                        }
                     }
-                    this.processTraverser(traverser);
+                    // the final message in the worker mail box, tell master you are done processing messages
+                    master().tell(new VoteToHaltMessage(true), self());
+                    this.voteToHalt = true;
                 }).build()
         );
     }
 
     private void processTraverser(final Traverser.Admin traverser) {
+        assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
+        final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+        step.addStart(traverser);
+        if (step instanceof Barrier) {
+            assert null == this.barrierLock || step == this.barrierLock;
+            this.barrierLock = (Barrier) step;
+        } else {
+            while (step.hasNext()) {
+                this.sendTraverser(step.next());
+            }
+        }
+    }
+
+    private void sendTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted())
-            context().parent().tell(traverser, self());
+            master().tell(traverser, self());
         else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) {
             final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
             final String workerPathString = "../worker-" + otherPartition.hashCode();
@@ -106,25 +136,11 @@ public final class WorkerTraversalActor extends AbstractActor implements
                 this.workers.put(workerPathString, worker);
             }
             worker.tell(traverser, self());
-        } else {
-            final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
-            step.addStart(traverser);
-            if (step instanceof Barrier) {
-                this.processBarrier((Barrier) step);
-            } else {
-                while (step.hasNext()) {
-                    this.processTraverser(step.next());
-                }
-            }
-        }
+        } else
+            self().tell(traverser, self());
     }
 
-    private void processBarrier(final Barrier barrier) {
-        if (barrier instanceof Bypassing)
-            ((Bypassing) barrier).setBypass(true);
-        while (barrier.hasNextBarrier()) {
-            context().parent().tell(new BarrierMergeMessage(barrier), self());
-        }
-        context().parent().tell(new BarrierSynchronizationMessage(barrier, true), self());
+    private ActorRef master() {
+        return context().parent();
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierAddMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierAddMessage.java
new file mode 100644
index 0000000..4e204f1
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierAddMessage.java
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierAddMessage {
+
+    private final Object barrier;
+    private final String stepId;
+
+    public BarrierAddMessage(final Barrier barrier) {
+        this.barrier = barrier.nextBarrier();
+        this.stepId = ((Step) barrier).getId();
+    }
+
+    public Object getBarrier() {
+        return this.barrier;
+    }
+
+    public String getStepId() {
+        return this.stepId;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierDoneMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierDoneMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierDoneMessage.java
new file mode 100644
index 0000000..73c0611
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierDoneMessage.java
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierDoneMessage implements SynchronizationMessage {
+
+    private final String stepId;
+
+    public BarrierDoneMessage(final Barrier barrier) {
+        this.stepId = ((Step) barrier).getId();
+    }
+
+    public String getStepId() {
+        return this.stepId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
deleted file mode 100644
index 55e3e70..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierMergeMessage {
-
-    private final Object barrier;
-    private final String stepId;
-
-    public BarrierMergeMessage(final Barrier barrier) {
-        this.barrier = barrier.nextBarrier();
-        this.stepId = ((Step) barrier).getId();
-    }
-
-    public Object getBarrier() {
-        return this.barrier;
-    }
-
-    public String getStepId() {
-        return this.stepId;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
deleted file mode 100644
index 467ac28..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierSynchronizationMessage implements SynchronizationMessage {
-
-    private final String stepId;
-    private final boolean lock;
-
-    public BarrierSynchronizationMessage(final Barrier barrier, final boolean lock) {
-        this.stepId = ((Step) barrier).getId();
-        this.lock = lock;
-    }
-
-    public String getStepId() {
-        return this.stepId;
-    }
-
-    public boolean getLock() {
-        return this.lock;
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
deleted file mode 100644
index f3419a0..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HaltSynchronizationMessage implements SynchronizationMessage {
-
-    private final boolean halt;
-
-    public HaltSynchronizationMessage(final boolean halt) {
-        this.halt = halt;
-    }
-
-    public boolean isHalt() {
-        return this.halt;
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectAddMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectAddMessage.java
new file mode 100644
index 0000000..f010dbb
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectAddMessage.java
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectAddMessage {
+
+    private final String sideEffectKey;
+    private final Object sideEffect;
+
+    public SideEffectAddMessage(final String sideEffectKey, final Object sideEffect) {
+        this.sideEffect = sideEffect;
+        this.sideEffectKey = sideEffectKey;
+    }
+
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    public Object getSideEffectValue() {
+        return this.sideEffect;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
deleted file mode 100644
index 1cb6866..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectMergeMessage {
-
-    private final String sideEffectKey;
-    private final Object sideEffect;
-
-    public SideEffectMergeMessage(final String sideEffectKey, final Object sideEffect) {
-        this.sideEffect = sideEffect;
-        this.sideEffectKey = sideEffectKey;
-    }
-
-    public String getKey() {
-        return this.sideEffectKey;
-    }
-
-    public Object getValue() {
-        return this.sideEffect;
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
new file mode 100644
index 0000000..7acb251
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
@@ -0,0 +1,32 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StartMessage {
+
+    private static final StartMessage INSTANCE = new StartMessage();
+
+    public static StartMessage instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
deleted file mode 100644
index bab03f5..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class StartSynchronizationMessage {
-
-    private static final StartSynchronizationMessage INSTANCE = new StartSynchronizationMessage();
-
-    public static StartSynchronizationMessage instance() {
-        return INSTANCE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
new file mode 100644
index 0000000..ca5929f
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VoteToHaltMessage implements SynchronizationMessage {
+
+    private final boolean voteToHalt;
+
+    public VoteToHaltMessage(final boolean voteToHalt) {
+        this.voteToHalt = voteToHalt;
+    }
+
+    public boolean voteToHalt() {
+        return this.voteToHalt;
+    }
+}