You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/07 20:53:12 UTC

tinkerpop git commit: added more test traversals to demonstrate cool features. Cleaned this up a bit. Added a new StartSynchronizationMessage.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 c80abcf37 -> 3063b99b7


added more test traversals to demonstrate cool features. Cleaned this up a bit. Added a new StartSynchronizationMessage.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3063b99b
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3063b99b
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3063b99b

Branch: refs/heads/TINKERPOP-1564
Commit: 3063b99b7521de30965edc3f744d85cdba2f6c36
Parents: c80abcf
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 7 13:53:09 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 7 13:53:09 2016 -0700

----------------------------------------------------------------------
 .../akka/ComputerActivationStrategy.java        | 54 ++++++++++++++++++++
 .../akka/DistributedTraversalSideEffects.java   |  9 +---
 .../process/akka/MasterTraversalActor.java      | 33 +++++++-----
 .../process/akka/TinkerActorSystem.java         | 45 ++++++++++++----
 .../process/akka/TraverserMailbox.java          |  4 +-
 .../process/akka/WorkerTraversalActor.java      | 49 +++++++++++-------
 .../akka/messages/BarrierMergeMessage.java      | 47 +++++++++++++++++
 .../process/akka/messages/BarrierMessage.java   | 47 -----------------
 .../akka/messages/SideEffectMergeMessage.java   | 42 +++++++++++++++
 .../akka/messages/SideEffectMessage.java        | 53 -------------------
 .../messages/StartSynchronizationMessage.java   | 32 ++++++++++++
 11 files changed, 265 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/ComputerActivationStrategy.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/ComputerActivationStrategy.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/ComputerActivationStrategy.java
new file mode 100644
index 0000000..35f2bb9
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/ComputerActivationStrategy.java
@@ -0,0 +1,54 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ComputerActivationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy {
+
+    private static final ComputerActivationStrategy INSTANCE = new ComputerActivationStrategy();
+
+    private ComputerActivationStrategy() {
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        if (!TraversalHelper.onGraphComputer(traversal))
+            return;
+        final boolean globalChild = TraversalHelper.isGlobalChild(traversal);
+        for (final Step<?, ?> step : traversal.getSteps()) {
+            // only global children are graph computing
+            if (globalChild && step instanceof GraphComputing)
+                ((GraphComputing) step).onGraphComputer();
+        }
+    }
+
+    public static ComputerActivationStrategy instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
index ceacdd3..8b29a08 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
@@ -20,13 +20,8 @@
 package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 
 import akka.actor.ActorContext;
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
 
 import java.util.Optional;
 import java.util.Set;
@@ -78,7 +73,7 @@ public final class DistributedTraversalSideEffects implements TraversalSideEffec
 
     @Override
     public void add(final String key, final Object value) {
-        this.context.parent().tell(new SideEffectMessage(key, value), this.context.self());
+        this.context.parent().tell(new SideEffectMergeMessage(key, value), this.context.self());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 824c7ca..94d9c27 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -22,22 +22,27 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 import akka.actor.AbstractActor;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.RequiresMessageQueue;
 import akka.japi.pf.ReceiveBuilder;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMergeMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartSynchronizationMessage;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -54,11 +59,15 @@ public final class MasterTraversalActor extends AbstractActor implements Require
     private final Traversal.Admin<?, ?> traversal;
     private final TraversalMatrix<?, ?> matrix;
     private final Partitioner partitioner;
-    private List<ActorPath> workers;
+    private List<ActorSelection> workers;
     private final Map<String, Set<ActorPath>> synchronizationLocks = new HashMap<>();
 
     public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
         System.out.println("master[created]: " + self().path());
+        final TraversalStrategies strategies = traversal.getStrategies().clone();
+        strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class);
+        strategies.addStrategies(ComputerActivationStrategy.instance());
+        traversal.setStrategies(strategies);
         traversal.applyStrategies();
         this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
         this.matrix = new TraversalMatrix<>(this.traversal);
@@ -69,7 +78,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require
                 match(Traverser.Admin.class, traverser -> {
                     this.processTraverser(traverser);
                 }).
-                match(BarrierMessage.class, barrier -> {
+                match(BarrierMergeMessage.class, barrier -> {
                     final Barrier barrierStep = ((Barrier) this.matrix.getStepById(barrier.getStepId()));
                     barrierStep.addBarrier(barrier.getBarrier());
                     broadcast(new BarrierSynchronizationMessage(barrierStep, true));
@@ -89,9 +98,9 @@ public final class MasterTraversalActor extends AbstractActor implements Require
                         }
                     }
                 }).
-                match(SideEffectMessage.class, sideEffect -> {
-                    this.traversal.getSideEffects().add(sideEffect.getKey(),sideEffect.getValue());
-                    //this.broadcast(new SideEffectMessage(sideEffect.getKey(), sideEffect.getValue()));
+                match(SideEffectMergeMessage.class, sideEffect -> {
+                    this.traversal.getSideEffects().add(sideEffect.getKey(), sideEffect.getValue());
+                    //this.broadcast(new SideEffectMergeMessage(sideEffect.getKey(), sideEffect.getValue()));
                 }).
                 match(HaltSynchronizationMessage.class, haltSync -> {
                     Set<ActorPath> locks = this.synchronizationLocks.get(Traverser.Admin.HALT);
@@ -115,16 +124,16 @@ public final class MasterTraversalActor extends AbstractActor implements Require
         this.workers = new ArrayList<>(partitions.size());
         for (final Partition partition : partitions) {
             final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), "worker-" + partition.hashCode());
-            this.workers.add(worker.path());
+            this.workers.add(context().actorSelection(worker.path()));
         }
-        for (final ActorPath worker : this.workers) {
-            context().actorSelection(worker).tell(TinkerActorSystem.State.START, self());
+        for (final ActorSelection worker : this.workers) {
+            worker.tell(StartSynchronizationMessage.instance(), self());
         }
     }
 
     private void broadcast(final Object message) {
-        for (final ActorPath worker : this.workers) {
-            context().actorSelection(worker).tell(message, self());
+        for (final ActorSelection worker : this.workers) {
+            worker.tell(message, self());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index bf3318c..b60f267 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -21,40 +21,65 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.structure.Column;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
 import org.apache.tinkerpop.gremlin.structure.util.HashPartitioner;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.as;
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.both;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class TinkerActorSystem {
 
-    public static enum State {START}
-
     private final ActorSystem system;
 
     public TinkerActorSystem(final Traversal.Admin<?, ?> traversal) {
         this.system = ActorSystem.create("traversal-" + traversal.hashCode());
-        this.system.actorOf(Props.create(MasterTraversalActor.class, traversal, new HashPartitioner(traversal.getGraph().get().partitioner(), 10)), "master");
+        this.system.actorOf(Props.create(MasterTraversalActor.class, traversal, new HashPartitioner(traversal.getGraph().get().partitioner(), 3)), "master");
     }
 
     //////////////
 
     public static void main(String args[]) throws Exception {
         final Graph graph = TinkerGraph.open();
-        graph.io(GryoIo.build()).readGraph("data/grateful-dead.kryo");
-        /*final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().match(
-                as("a").out().as("b"),
-                as("b").in().as("c"),
-                as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin();*/
-        final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount("a").by("name").cap("a").select(Column.keys).unfold().limit(10).asAdmin();
-        new TinkerActorSystem(traversal);
+        graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
+        final GraphTraversalSource g = graph.traversal().withComputer();
+        final List<Traversal.Admin<?, ?>> traversals = Arrays.asList(
+                // match() works
+                g.V().match(
+                        as("a").out("created").as("b"),
+                        as("b").in("created").as("c"),
+                        as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin(),
+                // side-effects work
+                g.V().repeat(both()).times(2).
+                        groupCount("a").by("name").
+                        cap("a").
+                        select(Column.keys).unfold().limit(3).asAdmin(),
+                // barries work and beyond the local star graph works
+                g.V().repeat(both()).times(2).hasLabel("person").
+                        group().
+                        by("name").
+                        by(out("created").values("name").dedup().fold()).asAdmin()
+        );
+        for (final Traversal.Admin<?, ?> traversal : traversals) {
+            System.out.println("EXECUTING: " + traversal.getBytecode());
+            final TinkerActorSystem actors = new TinkerActorSystem(traversal);
+            while (!actors.system.isTerminated()) {
+
+            }
+            System.out.println("//////////////////////////////////\n");
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index 69716a6..e0f84dc 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -30,7 +30,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
 import scala.Option;
 
 import java.util.Queue;
@@ -57,7 +57,7 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
         public void enqueue(final ActorRef receiver, final Envelope handle) {
             if (handle.message() instanceof Traverser.Admin)
                 this.traverserSet.offer((Traverser.Admin) handle.message());
-            else if (handle.message() instanceof SideEffectMessage)
+            else if (handle.message() instanceof SideEffectMergeMessage)
                 this.queue.offer(handle);
             else if (handle.message() instanceof BarrierSynchronizationMessage)
                 this.barrierSyncs.offer(handle);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 96cf6dd..20475ef 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -20,22 +20,27 @@
 package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 
 import akka.actor.AbstractActor;
-import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.dispatch.RequiresMessageQueue;
 import akka.japi.pf.ReceiveBuilder;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMergeMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartSynchronizationMessage;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -44,23 +49,21 @@ public final class WorkerTraversalActor extends AbstractActor implements
         RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
 
     private final TraversalMatrix<?, ?> matrix;
-    private final Partition partition;
+    private final Partition localPartition;
     private final Partitioner partitioner;
     private boolean sentHaltMessage = false;
+    private final Map<String, ActorSelection> workers = new HashMap<>();
 
-
-    public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition partition, final Partitioner partitioner) {
+    public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
         System.out.println("worker[created]: " + self().path());
         this.matrix = new TraversalMatrix<>(traversal);
-
-        this.partition = partition;
+        this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+        this.localPartition = localPartition;
         this.partitioner = partitioner;
-        ((GraphStep) traversal.getStartStep()).setIteratorSupplier(partition::vertices);
-
+        ((GraphStep) traversal.getStartStep()).setIteratorSupplier(localPartition::vertices);
 
         receive(ReceiveBuilder.
-                match(TinkerActorSystem.State.class, state -> {
-                    this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+                match(StartSynchronizationMessage.class, start -> {
                     final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
                     while (step.hasNext()) {
                         this.processTraverser(step.next());
@@ -68,13 +71,13 @@ public final class WorkerTraversalActor extends AbstractActor implements
                 }).
                 match(BarrierSynchronizationMessage.class, barrierSync -> {
                     final Barrier barrier = this.matrix.getStepById(barrierSync.getStepId());
-                    if(barrierSync.getLock()) {
+                    if (barrierSync.getLock()) {
                         this.processBarrier(barrier);
                     } else {
                         barrier.done();
                     }
                 }).
-                match(SideEffectMessage.class, sideEffect -> {
+                match(SideEffectMergeMessage.class, sideEffect -> {
                     // TODO: sideEffect.setSideEffect(this.matrix.getTraversal());
                 }).
                 match(HaltSynchronizationMessage.class, haltSync -> {
@@ -94,14 +97,20 @@ public final class WorkerTraversalActor extends AbstractActor implements
     private void processTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted())
             context().parent().tell(traverser, self());
-        else if (traverser.get() instanceof Element && !this.partition.contains((Element) traverser.get())) {
+        else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) {
             final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
-            context().actorSelection("../worker-" + otherPartition.hashCode()).tell(traverser, self());
+            final String workerPathString = "../worker-" + otherPartition.hashCode();
+            ActorSelection worker = this.workers.get(workerPathString);
+            if (null == worker) {
+                worker = context().actorSelection(workerPathString);
+                this.workers.put(workerPathString, worker);
+            }
+            worker.tell(traverser, self());
         } else {
             final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
             step.addStart(traverser);
             if (step instanceof Barrier) {
-                this.processBarrier((Barrier)step);
+                this.processBarrier((Barrier) step);
             } else {
                 while (step.hasNext()) {
                     this.processTraverser(step.next());
@@ -111,8 +120,10 @@ public final class WorkerTraversalActor extends AbstractActor implements
     }
 
     private void processBarrier(final Barrier barrier) {
-        while(barrier.hasNextBarrier()) {
-            context().parent().tell(new BarrierMessage(barrier), self());
+        if (barrier instanceof Bypassing)
+            ((Bypassing) barrier).setBypass(true);
+        while (barrier.hasNextBarrier()) {
+            context().parent().tell(new BarrierMergeMessage(barrier), self());
         }
         context().parent().tell(new BarrierSynchronizationMessage(barrier, true), self());
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
new file mode 100644
index 0000000..55e3e70
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierMergeMessage {
+
+    private final Object barrier;
+    private final String stepId;
+
+    public BarrierMergeMessage(final Barrier barrier) {
+        this.barrier = barrier.nextBarrier();
+        this.stepId = ((Step) barrier).getId();
+    }
+
+    public Object getBarrier() {
+        return this.barrier;
+    }
+
+    public String getStepId() {
+        return this.stepId;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMessage.java
deleted file mode 100644
index 211d227..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierMessage {
-
-    private final Object barrier;
-    private final String stepId;
-
-    public BarrierMessage(final Barrier barrier) {
-        this.barrier = barrier.nextBarrier();
-        this.stepId = ((Step) barrier).getId();
-    }
-
-    public Object getBarrier() {
-        return this.barrier;
-    }
-
-    public String getStepId() {
-        return this.stepId;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
new file mode 100644
index 0000000..1cb6866
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectMergeMessage {
+
+    private final String sideEffectKey;
+    private final Object sideEffect;
+
+    public SideEffectMergeMessage(final String sideEffectKey, final Object sideEffect) {
+        this.sideEffect = sideEffect;
+        this.sideEffectKey = sideEffectKey;
+    }
+
+    public String getKey() {
+        return this.sideEffectKey;
+    }
+
+    public Object getValue() {
+        return this.sideEffect;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
deleted file mode 100644
index 5b8685c..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectMessage {
-
-    private final String sideEffectKey;
-    private final Object sideEffect;
-
-    public SideEffectMessage(final String sideEffectKey, final Object sideEffect) {
-        this.sideEffect = sideEffect;
-        this.sideEffectKey = sideEffectKey;
-    }
-
-    public void setSideEffect(final Traversal.Admin<?, ?> traversal) {
-        traversal.getSideEffects().set(this.sideEffectKey, this.sideEffect);
-    }
-
-    public void addSideEffect(final Traversal.Admin<?, ?> traversal) {
-        System.out.println(traversal.getSideEffects().getClass());
-        traversal.getSideEffects().add(this.sideEffectKey, this.sideEffect);
-    }
-
-    public String getKey() {
-        return this.sideEffectKey;
-    }
-
-    public Object getValue() {
-        return this.sideEffect;
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
new file mode 100644
index 0000000..bab03f5
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
@@ -0,0 +1,32 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StartSynchronizationMessage {
+
+    private static final StartSynchronizationMessage INSTANCE = new StartSynchronizationMessage();
+
+    public static StartSynchronizationMessage instance() {
+        return INSTANCE;
+    }
+}