You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/13 16:52:48 UTC

[2/2] tinkerpop git commit: we now have a full generalized process/actor interface system where akka-gremlin/ implements those interfaces. Next, we have ActorProgram with TraversalActorProgram exectuing a traversal. This is identical in form to GraphComp

we now have a full generalized process/actor interface system where akka-gremlin/ implements those interfaces. Next, we have ActorProgram with TraversalActorProgram exectuing a traversal. This is identical in form to GraphComputer where people can submit arbitrary ActorPrograms. This is really clean and consistent with our other work.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/53bab282
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/53bab282
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/53bab282

Branch: refs/heads/TINKERPOP-1564
Commit: 53bab282ec8d80e319801e7bf40ee42f910fd5ed
Parents: 07347b8
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Dec 13 09:52:41 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Dec 13 09:52:41 2016 -0700

----------------------------------------------------------------------
 .../gremlin/akka/process/actor/AkkaActors.java  |  22 ++-
 .../process/actor/MasterTraversalActor.java     | 162 +++------------
 .../akka/process/actor/TraverserMailbox.java    |   5 +-
 .../process/actor/WorkerTraversalActor.java     | 180 ++++-------------
 .../actor/WorkerTraversalSideEffects.java       | 147 --------------
 .../actor/message/BarrierAddMessage.java        |  47 -----
 .../actor/message/BarrierDoneMessage.java       |  41 ----
 .../actor/message/SideEffectAddMessage.java     |  43 ----
 .../actor/message/SideEffectSetMessage.java     |  42 ----
 .../process/actor/message/StartMessage.java     |  35 ----
 .../actor/message/VoteToHaltMessage.java        |  36 ----
 .../gremlin/akka/process/AkkaPlayTest.java      |   2 +-
 .../tinkerpop/gremlin/process/actor/Actor.java  |  30 ++-
 .../gremlin/process/actor/ActorProgram.java     |  50 +++++
 .../tinkerpop/gremlin/process/actor/Actors.java |   2 +
 .../gremlin/process/actor/Address.java          |  62 ++++++
 .../gremlin/process/actor/MasterActor.java      |  35 ----
 .../gremlin/process/actor/WorkerActor.java      |  35 ----
 .../actor/traversal/TraversalActorProgram.java  |  66 +++++++
 .../actor/traversal/TraversalMasterProgram.java | 154 +++++++++++++++
 .../actor/traversal/TraversalWorkerProgram.java | 196 +++++++++++++++++++
 .../traversal/WorkerTraversalSideEffects.java   | 147 ++++++++++++++
 .../traversal/message/BarrierAddMessage.java    |  47 +++++
 .../traversal/message/BarrierDoneMessage.java   |  41 ++++
 .../traversal/message/SideEffectAddMessage.java |  43 ++++
 .../traversal/message/SideEffectSetMessage.java |  42 ++++
 .../actor/traversal/message/StartMessage.java   |  35 ++++
 .../traversal/message/VoteToHaltMessage.java    |  36 ++++
 .../actor/traversal/step/map/ActorStep.java     |   4 +-
 29 files changed, 1035 insertions(+), 752 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
index 8ef96bb..db024f6 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
@@ -21,8 +21,9 @@ package org.apache.tinkerpop.gremlin.akka.process.actor;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actor.Actors;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
@@ -34,12 +35,19 @@ import java.util.concurrent.Future;
  */
 public final class AkkaActors<S, E> implements Actors<S, E> {
 
-    public final ActorSystem system;
-    private TraverserSet<E> results = new TraverserSet<>();
+    private final ActorProgram actorProgram;
+    private final ActorSystem system;
+    private final Address.Master master;
 
-    public AkkaActors(final Traversal.Admin<S, E> traversal, final Partitioner partitioner) {
-        this.system = ActorSystem.create("traversal-" + traversal.hashCode());
-        this.system.actorOf(Props.create(MasterTraversalActor.class, traversal.clone(), partitioner, this.results), "master");
+    public AkkaActors(final ActorProgram actorProgram, final Partitioner partitioner) {
+        this.actorProgram = actorProgram;
+        this.system = ActorSystem.create("traversal-" + actorProgram.hashCode());
+        this.master = new Address.Master(this.system.actorOf(Props.create(MasterTraversalActor.class, this.actorProgram, partitioner), "master").path().toString());
+    }
+
+    @Override
+    public Address.Master master() {
+        return this.master;
     }
 
     @Override
@@ -48,7 +56,7 @@ public final class AkkaActors<S, E> implements Actors<S, E> {
             while (!this.system.isTerminated()) {
 
             }
-            return this.results;
+            return (TraverserSet) this.actorProgram.getResult();
         });
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
index 5009554..6799a28 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
@@ -20,35 +20,17 @@
 package org.apache.tinkerpop.gremlin.akka.process.actor;
 
 import akka.actor.AbstractActor;
-import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.RequiresMessageQueue;
 import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy;
-import org.apache.tinkerpop.gremlin.process.actor.MasterActor;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
-import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,131 +38,47 @@ import java.util.Map;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, MasterActor {
+public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Master {
 
-    private final Traversal.Admin<?, ?> traversal;
-    private final TraversalMatrix<?, ?> matrix;
-    private final Partitioner partitioner;
-    private final Map<String, ActorSelection> workers = new HashMap<>();
-    private Map<String, Barrier> barriers = new HashMap<>();
-    private final TraverserSet<?> results;
-    private final String leaderWorker;
+    private final Address.Master master;
+    private final List<Address.Worker> workers;
+    private final Map<Address, ActorSelection> actors = new HashMap<>();
 
-    public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
-        System.out.println("master[created]: " + self().path());
-        final TraversalStrategies strategies = traversal.getStrategies().clone();
-        strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class);
-        strategies.addStrategies(ActorVerificationStrategy.instance());
-        traversal.setStrategies(strategies);
-        traversal.applyStrategies();
-
-        this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
-        System.out.println(this.traversal);
-        this.matrix = new TraversalMatrix<>(this.traversal);
-        this.partitioner = partitioner;
-        this.results = results;
-        this.initializeWorkers();
-        this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode();
-
-        receive(ReceiveBuilder.
-                match(Traverser.Admin.class, this::processTraverser).
-                match(BarrierAddMessage.class, barrierMerge -> this.processBarrierAdd((Barrier) this.matrix.getStepById(barrierMerge.getStepId()), barrierMerge.getBarrier())).
-                match(SideEffectAddMessage.class, sideEffect -> this.processSideEffectAdd(((SideEffectAddMessage) sideEffect).getKey(), ((SideEffectAddMessage) sideEffect).getValue())).
-                match(VoteToHaltMessage.class, voteToHalt -> this.processVoteToHalt()).
-                build());
-    }
-
-    private void initializeWorkers() {
-        final List<Partition> partitions = this.partitioner.getPartitions();
+    public MasterTraversalActor(final ActorProgram program, final Partitioner partitioner) {
+        this.master = new Address.Master(self().path().toString());
+        this.workers = new ArrayList<>();
+        final List<Partition> partitions = partitioner.getPartitions();
         for (final Partition partition : partitions) {
-            final String workerPathString = "worker-" + partition.hashCode();
-            final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), workerPathString);
-            this.workers.put(workerPathString, context().actorSelection(worker.path()));
-        }
-        for (final ActorSelection worker : this.workers.values()) {
-            worker.tell(StartMessage.instance(), self());
+            this.workers.add(new Address.Worker("worker-" + partition.hashCode()));
+            context().actorOf(Props.create(WorkerTraversalActor.class, program, partitioner, partition), "worker-" + partition.hashCode());
         }
-        this.workers.clear();
+        final ActorProgram.Master masterProgram = program.createMasterProgram(this);
+        receive(ReceiveBuilder.matchAny(masterProgram::execute).build());
+        masterProgram.setup();
     }
 
     @Override
-    public void processBarrierAdd(final Barrier barrier, final Object barrierAddition) {
-        final Step<?, ?> step = (Step) barrier;
-        GraphComputing.atMaster(step, true);
-        barrier.addBarrier(barrierAddition);
-        this.barriers.put(step.getId(), barrier);
+    public <M> void send(final Address toActor, final M message) {
+        ActorSelection actor = this.actors.get(toActor);
+        if (null == actor) {
+            actor = context().actorSelection(toActor.location());
+            this.actors.put(toActor, actor);
+        }
+        actor.tell(message, self());
     }
 
     @Override
-    public void processSideEffectAdd(final String key, final Object value) {
-        this.traversal.getSideEffects().add(key, value);
+    public List<Address.Worker> workers() {
+        return this.workers;
     }
 
     @Override
-    public void processVoteToHalt() {
-        assert !sender().equals(self());
-        if (!this.barriers.isEmpty()) {
-            for (final Barrier barrier : this.barriers.values()) {
-                final Step<?, ?> step = (Step) barrier;
-                if (!(barrier instanceof LocalBarrier)) {
-                    while (step.hasNext()) {
-                        this.sendTraverser(step.next());
-                    }
-                } else {
-                    this.traversal.getSideEffects().forEach((k, v) -> {
-                        this.broadcast(new SideEffectSetMessage(k, v));
-                    });
-                    this.broadcast(new BarrierDoneMessage(barrier));
-                    barrier.done();
-                }
-            }
-            this.barriers.clear();
-            worker(this.leaderWorker).tell(StartMessage.instance(), self());
-        } else {
-            while (this.traversal.hasNext()) {
-                this.results.add((Traverser.Admin) this.traversal.nextTraverser());
-            }
-            context().system().terminate();
-        }
+    public Address.Master address() {
+        return this.master;
     }
 
     @Override
-    public void processTraverser(final Traverser.Admin traverser) {
-        if (traverser.isHalted() || traverser.get() instanceof Element) {
-            this.sendTraverser(traverser);
-        } else {
-            final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
-            GraphComputing.atMaster(step, true);
-            step.addStart(traverser);
-            while (step.hasNext()) {
-                this.processTraverser(step.next());
-            }
-        }
-    }
-
-    ////////////////
-
-    private void broadcast(final Object message) {
-        for (final Partition partition : this.partitioner.getPartitions()) {
-            worker("worker-" + partition.hashCode()).tell(message, self());
-        }
-    }
-
-    private void sendTraverser(final Traverser.Admin traverser) {
-        if (traverser.isHalted())
-            this.results.add(traverser);
-        else if (traverser.get() instanceof Element)
-            worker("worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()).tell(traverser, self());
-        else
-            self().tell(traverser, self());
-    }
-
-    private ActorSelection worker(final String workerPath) {
-        ActorSelection worker = this.workers.get(workerPath);
-        if (null == worker) {
-            worker = context().actorSelection(workerPath);
-            this.workers.put(workerPath, worker);
-        }
-        return worker;
+    public void close() {
+        context().system().terminate();
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
index 6a6c0f4..671f3a2 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
@@ -26,7 +26,8 @@ import akka.dispatch.MailboxType;
 import akka.dispatch.MessageQueue;
 import akka.dispatch.ProducesMessageQueue;
 import com.typesafe.config.Config;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalWorkerProgram;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import scala.Option;
@@ -58,7 +59,7 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
                 else if (handle.message() instanceof VoteToHaltMessage) {
                     assert null == this.haltMessage;
                     this.haltMessage = handle;
-                } else if (handle.message() instanceof WorkerTraversalActor.Terminate) {
+                } else if (handle.message() instanceof TraversalWorkerProgram.Terminate) {
                     assert null == this.terminateToken;
                     this.terminateToken = handle;
                 } else

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
index 862b6b3..5a6bae7 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
@@ -20,184 +20,72 @@
 package org.apache.tinkerpop.gremlin.akka.process.actor;
 
 import akka.actor.AbstractActor;
-import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.dispatch.RequiresMessageQueue;
 import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
-import org.apache.tinkerpop.gremlin.process.actor.WorkerActor;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, WorkerActor {
+public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Worker {
 
-    // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm)
-    public enum Terminate {
-        MAYBE, YES, NO
-    }
-
-    private final TraversalMatrix<?, ?> matrix;
     private final Partition localPartition;
-    private final Partitioner partitioner;
-    //
-    private final Map<String, ActorSelection> workers = new HashMap<>();
-    private final String neighborWorker;
-    private boolean isLeader;
-    private Terminate terminate = null;
-    private boolean voteToHalt = false;
-    private Map<String, Barrier> barriers = new HashMap<>();
+    private final Address.Worker self;
+    private final Address.Master master;
+    private final List<Address.Worker> workers;
+    private final Map<Address, ActorSelection> actors = new HashMap<>();
 
-    public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
-        System.out.println("worker[created]: " + self().path());
-        // set up partition and traversal information
+    public WorkerTraversalActor(final ActorProgram program, final Partitioner partitioner, final Partition localPartition) {
         this.localPartition = localPartition;
-        this.partitioner = partitioner;
-        final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), context());
-        TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
-        this.matrix = new TraversalMatrix<>(traversal);
-        final GraphStep graphStep = (GraphStep) traversal.getStartStep();
-        if (0 == graphStep.getIds().length)
-            ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges);
-        else {
-            if (graphStep.returnsVertex())
-                ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier(
-                        () -> IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), this.localPartition::contains));
-            else
-                ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
-                        () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains));
+        this.self = new Address.Worker(self().path().toString());
+        this.master = new Address.Master(context().parent().path().toString());
+        this.workers = new ArrayList<>();
+        for (final Partition partition : partitioner.getPartitions()) {
+            this.workers.add(new Address.Worker("../worker-" + partition.hashCode()));
         }
-        // create termination ring topology
-        final int i = this.partitioner.getPartitions().indexOf(this.localPartition);
-        this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
-        this.isLeader = i == 0;
-
-        receive(ReceiveBuilder.
-                match(StartMessage.class, start -> this.processStart()).
-                match(Traverser.Admin.class, this::processTraverser).
-                match(SideEffectSetMessage.class, sideEffect -> this.processSideEffectSet(sideEffect.getKey(), sideEffect.getValue())).
-                match(BarrierDoneMessage.class, barrierDone -> this.processBarrierDone(this.matrix.getStepById(barrierDone.getStepId()))).
-                match(Terminate.class, terminate -> {
-                    assert this.isLeader || this.terminate != Terminate.MAYBE;
-                    this.terminate = terminate;
-                    self().tell(VoteToHaltMessage.instance(), self());
-                }).
-                match(VoteToHaltMessage.class, haltSync -> {
-                    // if there is a barrier and thus, halting at barrier, then process barrier
-                    if (!this.barriers.isEmpty()) {
-                        for (final Barrier barrier : this.barriers.values()) {
-                            while (barrier.hasNextBarrier()) {
-                                master().tell(new BarrierAddMessage(barrier), self());
-                            }
-                        }
-                        this.barriers.clear();
-                        this.voteToHalt = false;
-                    }
-                    // use termination token to determine termination condition
-                    if (null != this.terminate) {
-                        if (this.isLeader) {
-                            if (this.voteToHalt && Terminate.YES == this.terminate)
-                                master().tell(VoteToHaltMessage.instance(), self());
-                            else
-                                worker(this.neighborWorker).tell(Terminate.YES, self());
-                        } else
-                            worker(this.neighborWorker).tell(this.voteToHalt ? this.terminate : Terminate.NO, self());
-                        this.terminate = null;
-                        this.voteToHalt = true;
-                    }
-                }).build()
-        );
+        ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
+        receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
+        workerProgram.setup();
     }
 
     @Override
-    public void processStart() {
-        // initial message from master that says: "start processing"
-        final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
-        while (step.hasNext()) {
-            this.sendTraverser(step.next());
-        }
-        // internal vote to have in mailbox as final message to process
-        assert null == this.terminate;
-        if (this.isLeader) {
-            this.terminate = Terminate.MAYBE;
-            self().tell(VoteToHaltMessage.instance(), self());
+    public <M> void send(final Address toActor, final M message) {
+        ActorSelection actor = this.actors.get(toActor);
+        if (null == actor) {
+            actor = context().actorSelection(toActor.location());
+            this.actors.put(toActor, actor);
         }
+        actor.tell(message, self());
     }
 
     @Override
-    public void processTraverser(final Traverser.Admin traverser) {
-        assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
-        final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
-        if (step instanceof Bypassing) ((Bypassing) step).setBypass(true);
-        GraphComputing.atMaster(step, false);
-        step.addStart(traverser);
-        if (step instanceof Barrier) {
-            this.barriers.put(step.getId(), (Barrier) step);
-        } else {
-            while (step.hasNext()) {
-                this.sendTraverser(step.next());
-            }
-        }
+    public List<Address.Worker> workers() {
+        return this.workers;
     }
 
     @Override
-    public void processBarrierDone(final Barrier barrier) {
-        final Step<?, ?> step = (Step) barrier;
-        while (step.hasNext()) {
-            sendTraverser(step.next());
-        }
+    public Partition partition() {
+        return this.localPartition;
     }
 
     @Override
-    public void processSideEffectSet(final String key, final Object value) {
-        this.matrix.getTraversal().getSideEffects().set(key, value);
+    public Address.Worker address() {
+        return this.self;
     }
 
-    //////////////////////
-
-    private void sendTraverser(final Traverser.Admin traverser) {
-        this.voteToHalt = false;
-        if (traverser.isHalted())
-            master().tell(traverser, self());
-        else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get()))
-            worker("../worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()).tell(traverser, self());
-        else
-            self().tell(traverser, self());
-    }
-
-    private ActorSelection worker(final String workerPath) {
-        ActorSelection worker = this.workers.get(workerPath);
-        if (null == worker) {
-            worker = context().actorSelection(workerPath);
-            this.workers.put(workerPath, worker);
-        }
-        return worker;
-    }
-
-    private ActorRef master() {
-        return context().parent();
+    @Override
+    public Address.Master master() {
+        return this.master;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
deleted file mode 100644
index 9c03298..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.ActorContext;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.BinaryOperator;
-import java.util.function.Supplier;
-import java.util.function.UnaryOperator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class WorkerTraversalSideEffects implements TraversalSideEffects {
-
-    private TraversalSideEffects sideEffects;
-    private ActorContext context;
-
-
-    private WorkerTraversalSideEffects() {
-        // for serialization
-    }
-
-    public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext context) {
-        this.sideEffects = sideEffects;
-        this.context = context;
-    }
-
-    public TraversalSideEffects getSideEffects() {
-        return this.sideEffects;
-    }
-
-    @Override
-    public void set(final String key, final Object value) {
-        this.sideEffects.set(key, value);
-    }
-
-    @Override
-    public <V> V get(final String key) throws IllegalArgumentException {
-        return this.sideEffects.get(key);
-    }
-
-    @Override
-    public void remove(final String key) {
-        this.sideEffects.remove(key);
-    }
-
-    @Override
-    public Set<String> keys() {
-        return this.sideEffects.keys();
-    }
-
-    @Override
-    public void add(final String key, final Object value) {
-        this.context.parent().tell(new SideEffectAddMessage(key, value), this.context.self());
-    }
-
-    @Override
-    public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
-        this.sideEffects.register(key, initialValue, reducer);
-    }
-
-    @Override
-    public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
-        this.sideEffects.registerIfAbsent(key, initialValue, reducer);
-    }
-
-    @Override
-    public <V> BinaryOperator<V> getReducer(final String key) {
-        return this.sideEffects.getReducer(key);
-    }
-
-    @Override
-    public <V> Supplier<V> getSupplier(final String key) {
-        return this.sideEffects.getSupplier(key);
-    }
-
-    @Override
-    @Deprecated
-    public void registerSupplier(final String key, final Supplier supplier) {
-        this.sideEffects.registerSupplier(key, supplier);
-    }
-
-    @Override
-    @Deprecated
-    public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
-        return this.sideEffects.getRegisteredSupplier(key);
-    }
-
-    @Override
-    public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
-        this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
-    }
-
-    @Override
-    public <S> Supplier<S> getSackInitialValue() {
-        return this.sideEffects.getSackInitialValue();
-    }
-
-    @Override
-    public <S> UnaryOperator<S> getSackSplitter() {
-        return this.sideEffects.getSackSplitter();
-    }
-
-    @Override
-    public <S> BinaryOperator<S> getSackMerger() {
-        return this.sideEffects.getSackMerger();
-    }
-
-    @Override
-    public TraversalSideEffects clone() {
-        try {
-            final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone();
-            clone.sideEffects = this.sideEffects.clone();
-            return clone;
-        } catch (final CloneNotSupportedException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void mergeInto(final TraversalSideEffects sideEffects) {
-        this.sideEffects.mergeInto(sideEffects);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
deleted file mode 100644
index 4a351c1..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierAddMessage {
-
-    private final Object barrier;
-    private final String stepId;
-
-    public BarrierAddMessage(final Barrier barrier) {
-        this.barrier = barrier.nextBarrier();
-        this.stepId = ((Step) barrier).getId();
-    }
-
-    public Object getBarrier() {
-        return this.barrier;
-    }
-
-    public String getStepId() {
-        return this.stepId;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
deleted file mode 100644
index 208b346..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierDoneMessage {
-
-    private final String stepId;
-
-    public BarrierDoneMessage(final Barrier barrier) {
-        this.stepId = ((Step) barrier).getId();
-
-    }
-
-    public String getStepId() {
-        return this.stepId;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
deleted file mode 100644
index 2d97bfa..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectAddMessage {
-
-    private final String key;
-    private final Object value;
-
-    public SideEffectAddMessage(final String key, final Object value) {
-        this.value = value;
-        this.key = key;
-    }
-
-    public String getKey() {
-        return this.key;
-    }
-
-    public Object getValue() {
-        return this.value;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java
deleted file mode 100644
index 023133b..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectSetMessage {
-
-    private final String key;
-    private final Object value;
-
-    public SideEffectSetMessage(final String key, final Object value) {
-        this.key = key;
-        this.value = value;
-    }
-
-    public String getKey() {
-        return this.key;
-    }
-
-    public Object getValue() {
-        return this.value;
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
deleted file mode 100644
index ebc469c..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class StartMessage {
-
-    private static final StartMessage INSTANCE = new StartMessage();
-
-    private StartMessage() {
-    }
-
-    public static StartMessage instance() {
-        return INSTANCE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
deleted file mode 100644
index 8bfa4c9..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class VoteToHaltMessage {
-
-    private static final VoteToHaltMessage INSTANCE = new VoteToHaltMessage();
-
-    private VoteToHaltMessage() {
-    }
-
-    public static VoteToHaltMessage instance() {
-        return INSTANCE;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
index bed7636..f94083c 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
@@ -42,7 +42,7 @@ public class AkkaPlayTest {
         final Graph graph = TinkerGraph.open();
         graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
         GraphTraversalSource g = graph.traversal().withStrategies(new ActorStrategy(AkkaActors.class, new HashPartitioner(graph.partitioner(), 3)));
-        System.out.println(g.V(1, 2).union(outE().count(), inE().count(), (Traversal) outE().values("weight").sum()).toList());
+        System.out.println(g.V().values("name").toList());
         //3, 1.9, 1
         /*for (int i = 0; i < 10000; i++) {
             final Graph graph = TinkerGraph.open();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
index aa2f429..2552883 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
@@ -19,13 +19,39 @@
 
 package org.apache.tinkerpop.gremlin.process.actor;
 
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+
+import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public interface Actor {
 
-    public <S> void processTraverser(final Traverser.Admin<S> traverser);
+    public Address address();
+
+    public <M> void send(final Address toActor, final M message);
+
+    public interface Master extends Actor {
+
+        public List<Address.Worker> workers();
+
+        public Address.Master address();
+
+        public void close();
+
+    }
+
+    public interface Worker extends Actor {
+
+        public Address.Worker address();
+
+        public Address.Master master();
+
+        public List<Address.Worker> workers();
+
+        public Partition partition();
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
new file mode 100644
index 0000000..b8f7ac1
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface ActorProgram<M> {
+
+    public Worker createWorkerProgram(final Actor.Worker worker);
+
+    public Master createMasterProgram(final Actor.Master master);
+
+    public M getResult();
+
+    public static interface Worker<M> {
+        public void setup();
+
+        public void execute(final M message);
+
+        public void terminate();
+
+    }
+
+    public static interface Master<M> {
+        public void setup();
+
+        public void execute(final M message);
+
+        public void terminate();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
index d9e257e..2e410ec 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
@@ -28,5 +28,7 @@ import java.util.concurrent.Future;
  */
 public interface Actors<S, E> {
 
+    public Address.Master master();
+
     public Future<TraverserSet<E>> submit();
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
new file mode 100644
index 0000000..c598eb7
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
@@ -0,0 +1,62 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class Address implements Serializable {
+
+    private final String location;
+
+    public Address(final String location) {
+        this.location = location;
+    }
+
+    public String location() {
+        return this.location;
+    }
+
+    public boolean equals(final Object other) {
+        return other instanceof Address && ((Address) other).location.equals(this.location);
+    }
+
+    public int hashCode() {
+        return this.location.hashCode();
+    }
+
+    public static class Master extends Address {
+
+        public Master(final String location) {
+            super(location);
+        }
+
+    }
+
+    public static class Worker extends Address {
+
+        public Worker(final String location) {
+            super(location);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java
deleted file mode 100644
index 87efe51..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor;
-
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface MasterActor extends Actor {
-
-    public <V> void processBarrierAdd(final Barrier barrier, final V barrierAddition);
-
-    public <V> void processSideEffectAdd(final String key, final V value);
-
-    public void processVoteToHalt();
-
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java
deleted file mode 100644
index 6d4ca64..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor;
-
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface WorkerActor extends Actor {
-
-    public void processStart();
-
-    public void processBarrierDone(final Barrier barrier);
-
-    public void processSideEffectSet(final String key, final Object value);
-
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
new file mode 100644
index 0000000..278fb3b
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
@@ -0,0 +1,66 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraversalActorProgram<M> implements ActorProgram<M> {
+
+    private final Traversal.Admin<?, ?> traversal;
+    private final Partitioner partitioner;
+    public TraverserSet<?> result = new TraverserSet<>();
+
+    public TraversalActorProgram(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
+        this.partitioner = partitioner;
+        final TraversalStrategies strategies = traversal.getStrategies().clone();
+        strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class);
+        strategies.addStrategies(ActorVerificationStrategy.instance());
+        traversal.setStrategies(strategies);
+        traversal.applyStrategies();
+        this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
+    }
+
+    @Override
+    public Worker<M> createWorkerProgram(final Actor.Worker worker) {
+        return new TraversalWorkerProgram<>(worker, this.traversal.clone(), this.partitioner);
+    }
+
+    @Override
+    public Master createMasterProgram(final Actor.Master master) {
+        return new TraversalMasterProgram<>(master, this.traversal.clone(), this.partitioner, this.result);
+    }
+
+    @Override
+    public M getResult() {
+        return (M) this.result;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
new file mode 100644
index 0000000..654969b
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -0,0 +1,154 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
+
+    private final Actor.Master master;
+    private final Map<String, Address.Worker> workers = new HashMap<>();
+    private final Traversal.Admin<?, ?> traversal;
+    private final TraversalMatrix<?, ?> matrix;
+    private final Partitioner partitioner;
+    private Map<String, Barrier> barriers = new HashMap<>();
+    private final TraverserSet<?> results;
+    private final String leaderWorker;
+
+    public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
+        this.traversal = traversal;
+        System.out.println("master[created]: " + master.address().location());
+        System.out.println(this.traversal);
+        this.matrix = new TraversalMatrix<>(this.traversal);
+        this.partitioner = partitioner;
+        this.results = results;
+        this.master = master;
+        this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode();
+    }
+
+    @Override
+    public void setup() {
+        for (final Address.Worker worker : master.workers()) {
+            this.workers.put(worker.location(), worker);
+        }
+        this.broadcast(StartMessage.instance());
+
+    }
+
+    @Override
+    public void execute(final M message) {
+        if (message instanceof Traverser.Admin) {
+            this.processTraverser((Traverser.Admin) message);
+        } else if (message instanceof BarrierAddMessage) {
+            final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId());
+            final Step<?, ?> step = (Step) barrier;
+            GraphComputing.atMaster(step, true);
+            barrier.addBarrier(((BarrierAddMessage) message).getBarrier());
+            this.barriers.put(step.getId(), barrier);
+        } else if (message instanceof SideEffectAddMessage) {
+            this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue());
+        } else if (message instanceof VoteToHaltMessage) {
+            if (!this.barriers.isEmpty()) {
+                for (final Barrier barrier : this.barriers.values()) {
+                    final Step<?, ?> step = (Step) barrier;
+                    if (!(barrier instanceof LocalBarrier)) {
+                        while (step.hasNext()) {
+                            this.sendTraverser(step.next());
+                        }
+                    } else {
+                        this.traversal.getSideEffects().forEach((k, v) -> {
+                            this.broadcast(new SideEffectSetMessage(k, v));
+                        });
+                        this.broadcast(new BarrierDoneMessage(barrier));
+                        barrier.done();
+                    }
+                }
+                this.barriers.clear();
+                this.master.send(this.workers.get(this.leaderWorker), StartMessage.instance());
+            } else {
+                while (this.traversal.hasNext()) {
+                    this.results.add((Traverser.Admin) this.traversal.nextTraverser());
+                }
+                this.master.close();
+            }
+        } else {
+            throw new IllegalStateException("Unknown message:" + message);
+        }
+    }
+
+    @Override
+    public void terminate() {
+
+    }
+
+    private void broadcast(final Object message) {
+        for (final Address.Worker worker : this.workers.values()) {
+            this.master.send(worker, message);
+        }
+    }
+
+    private void processTraverser(final Traverser.Admin traverser) {
+        if (traverser.isHalted() || traverser.get() instanceof Element) {
+            this.sendTraverser(traverser);
+        } else {
+            final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+            GraphComputing.atMaster(step, true);
+            step.addStart(traverser);
+            while (step.hasNext()) {
+                this.processTraverser(step.next());
+            }
+        }
+    }
+
+    private void sendTraverser(final Traverser.Admin traverser) {
+        if (traverser.isHalted())
+            this.results.add(traverser);
+        else if (traverser.get() instanceof Element)
+            this.master.send(this.workers.get("worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser);
+        else
+            this.master.send(this.master.address(), traverser);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
new file mode 100644
index 0000000..58e06d6
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -0,0 +1,196 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
+
+    // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm)
+    public enum Terminate {
+        MAYBE, YES, NO
+    }
+
+    private final Actor.Worker self;
+    private final TraversalMatrix<?, ?> matrix;
+    private final Partition localPartition;
+    private final Partitioner partitioner;
+    //
+    private final Map<String, Address.Worker> workers = new HashMap<>();
+    private final String neighborWorker;
+    private boolean isLeader;
+    private Terminate terminate = null;
+    private boolean voteToHalt = false;
+    private Map<String, Barrier> barriers = new HashMap<>();
+
+    public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
+        this.self = self;
+        System.out.println("worker[created]: " + this.self.address().location());
+        // set up partition and traversal information
+        this.localPartition = self.partition();
+        this.partitioner = partitioner;
+        final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
+        TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
+        this.matrix = new TraversalMatrix<>(traversal);
+        final GraphStep graphStep = (GraphStep) traversal.getStartStep();
+        if (0 == graphStep.getIds().length)
+            ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges);
+        else {
+            if (graphStep.returnsVertex())
+                ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier(
+                        () -> IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), this.localPartition::contains));
+            else
+                ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
+                        () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains));
+        }
+        // create termination ring topology
+        final int i = this.partitioner.getPartitions().indexOf(this.localPartition);
+        this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
+        this.isLeader = i == 0;
+        for (final Address.Worker worker : self.workers()) {
+            //if (!worker.equals(this.self.address()))
+            this.workers.put(worker.location(), worker);
+        }
+    }
+
+    @Override
+    public void setup() {
+
+    }
+
+    @Override
+    public void execute(final M message) {
+        //System.out.println(message + "::" + this.isLeader);
+        if (message instanceof StartMessage) {
+            // initial message from master that says: "start processing"
+            final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
+            while (step.hasNext()) {
+                this.sendTraverser(step.next());
+            }
+            // internal vote to have in mailbox as final message to process
+            // assert null == this.terminate;
+            if (this.isLeader) {
+                this.terminate = Terminate.MAYBE;
+                this.self.send(this.self.address(), VoteToHaltMessage.instance());
+            }
+        } else if (message instanceof Traverser.Admin) {
+            final Traverser.Admin<?> traverser = (Traverser.Admin) message;
+            this.processTraverser(traverser);
+
+        } else if (message instanceof SideEffectSetMessage) {
+            this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue());
+        } else if (message instanceof Terminate) {
+            // assert this.isLeader || this.terminate != Terminate.MAYBE;
+            this.terminate = (Terminate) message;
+            this.self.send(this.self.address(), VoteToHaltMessage.instance());
+        } else if (message instanceof VoteToHaltMessage) {
+            // if there is a barrier and thus, halting at barrier, then process barrier
+            if (!this.barriers.isEmpty()) {
+                for (final Barrier barrier : this.barriers.values()) {
+                    while (barrier.hasNextBarrier()) {
+                        this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+                    }
+                }
+                this.barriers.clear();
+                this.voteToHalt = false;
+            }
+            // use termination token to determine termination condition
+            if (null != this.terminate) {
+                if (this.isLeader) {
+                    if (this.voteToHalt && Terminate.YES == this.terminate)
+                        this.self.send(this.self.master(), VoteToHaltMessage.instance());
+                    else
+                        this.self.send(this.workers.get(this.neighborWorker), Terminate.YES);
+                } else
+                    this.self.send(this.workers.get(this.neighborWorker), this.voteToHalt ? this.terminate : Terminate.NO);
+                this.terminate = null;
+                this.voteToHalt = true;
+            }
+        } else if (message instanceof BarrierDoneMessage) {
+            final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId());
+            while (step.hasNext()) {
+                sendTraverser(step.next());
+            }
+        } else {
+            throw new IllegalArgumentException("The following message is unknown: " + message);
+        }
+    }
+
+    @Override
+    public void terminate() {
+
+    }
+
+    //////////////
+
+    private void processTraverser(final Traverser.Admin traverser) {
+        // assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
+        final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+        if (step instanceof Bypassing) ((Bypassing) step).setBypass(true);
+        GraphComputing.atMaster(step, false);
+        step.addStart(traverser);
+        if (step instanceof Barrier) {
+            this.barriers.put(step.getId(), (Barrier) step);
+        } else {
+            while (step.hasNext()) {
+                this.sendTraverser(step.next());
+            }
+        }
+    }
+
+    private void sendTraverser(final Traverser.Admin traverser) {
+        this.voteToHalt = false;
+        if (traverser.isHalted())
+            this.self.send(this.self.master(), traverser);
+        else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get()))
+            this.self.send(this.workers.get("../worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser);
+        else
+            this.self.send(this.self.address(), traverser);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
new file mode 100644
index 0000000..6ab66c4
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
@@ -0,0 +1,147 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerTraversalSideEffects implements TraversalSideEffects {
+
+    private TraversalSideEffects sideEffects;
+    private Actor.Worker worker;
+
+
+    private WorkerTraversalSideEffects() {
+        // for serialization
+    }
+
+    public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) {
+        this.sideEffects = sideEffects;
+        this.worker = worker;
+    }
+
+    public TraversalSideEffects getSideEffects() {
+        return this.sideEffects;
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        this.sideEffects.set(key, value);
+    }
+
+    @Override
+    public <V> V get(final String key) throws IllegalArgumentException {
+        return this.sideEffects.get(key);
+    }
+
+    @Override
+    public void remove(final String key) {
+        this.sideEffects.remove(key);
+    }
+
+    @Override
+    public Set<String> keys() {
+        return this.sideEffects.keys();
+    }
+
+    @Override
+    public void add(final String key, final Object value) {
+        this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value));
+    }
+
+    @Override
+    public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+        this.sideEffects.register(key, initialValue, reducer);
+    }
+
+    @Override
+    public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+        this.sideEffects.registerIfAbsent(key, initialValue, reducer);
+    }
+
+    @Override
+    public <V> BinaryOperator<V> getReducer(final String key) {
+        return this.sideEffects.getReducer(key);
+    }
+
+    @Override
+    public <V> Supplier<V> getSupplier(final String key) {
+        return this.sideEffects.getSupplier(key);
+    }
+
+    @Override
+    @Deprecated
+    public void registerSupplier(final String key, final Supplier supplier) {
+        this.sideEffects.registerSupplier(key, supplier);
+    }
+
+    @Override
+    @Deprecated
+    public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+        return this.sideEffects.getRegisteredSupplier(key);
+    }
+
+    @Override
+    public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
+        this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
+    }
+
+    @Override
+    public <S> Supplier<S> getSackInitialValue() {
+        return this.sideEffects.getSackInitialValue();
+    }
+
+    @Override
+    public <S> UnaryOperator<S> getSackSplitter() {
+        return this.sideEffects.getSackSplitter();
+    }
+
+    @Override
+    public <S> BinaryOperator<S> getSackMerger() {
+        return this.sideEffects.getSackMerger();
+    }
+
+    @Override
+    public TraversalSideEffects clone() {
+        try {
+            final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone();
+            clone.sideEffects = this.sideEffects.clone();
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void mergeInto(final TraversalSideEffects sideEffects) {
+        this.sideEffects.mergeInto(sideEffects);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/53bab282/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java
new file mode 100644
index 0000000..dba9b86
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierAddMessage {
+
+    private final Object barrier;
+    private final String stepId;
+
+    public BarrierAddMessage(final Barrier barrier) {
+        this.barrier = barrier.nextBarrier();
+        this.stepId = ((Step) barrier).getId();
+    }
+
+    public Object getBarrier() {
+        return this.barrier;
+    }
+
+    public String getStepId() {
+        return this.stepId;
+    }
+
+
+}