You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/10 16:11:25 UTC

[17/50] [abbrv] tinkerpop git commit: okay. mailboxes are controlled by ActorProgram.getMessagePriorities(). Took me forever to figure out how to dynamically configure akka mailboxes.

okay. mailboxes are controlled by ActorProgram.getMessagePriorities(). Took me forever to figure out how to dynamically configure akka mailboxes.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a7e7a0fe
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a7e7a0fe
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a7e7a0fe

Branch: refs/heads/TINKERPOP-1564
Commit: a7e7a0fe9a0a064acf80694376563e1fe52113d9
Parents: 1c342d5
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Dec 13 12:58:54 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 10 08:22:55 2017 -0700

----------------------------------------------------------------------
 .../gremlin/akka/process/actor/AkkaActors.java  |   9 +-
 .../gremlin/akka/process/actor/MasterActor.java |   1 +
 .../akka/process/actor/TraverserMailbox.java    | 104 ++++++++++---------
 .../gremlin/akka/process/actor/WorkerActor.java |   2 +-
 .../src/main/resources/application.conf         |   4 +
 .../tinkerpop/gremlin/process/actor/Actor.java  |   4 +
 .../gremlin/process/actor/ActorProgram.java     |   6 ++
 .../actor/traversal/TraversalActorProgram.java  |  27 +++++
 .../actor/traversal/TraversalMasterProgram.java |   6 +-
 .../actor/traversal/TraversalWorkerProgram.java |   8 +-
 .../traversal/WorkerTraversalSideEffects.java   |   1 +
 .../actor/traversal/message/Terminate.java      |  28 +++++
 12 files changed, 141 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
index de301c1..5faa8d6 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
@@ -21,6 +21,9 @@ package org.apache.tinkerpop.gremlin.akka.process.actor;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actor.Actors;
 import org.apache.tinkerpop.gremlin.process.actor.Address;
@@ -28,6 +31,7 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -40,7 +44,10 @@ public final class AkkaActors<R> implements Actors<R> {
 
     public AkkaActors(final ActorProgram<R> actorProgram, final Partitioner partitioner) {
         this.actorProgram = actorProgram;
-        this.system = ActorSystem.create("traversal-" + actorProgram.hashCode());
+        final Config config = ConfigFactory.defaultApplication().
+                withValue("message-priorities",
+                        ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString()));
+        this.system = ActorSystem.create("traversal-" + actorProgram.hashCode(), config);
         this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner), "master").path().toString());
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
index d7b45fa..8d61492 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -81,4 +81,5 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
     public void close() {
         context().system().terminate();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
index 671f3a2..8251ab4 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
@@ -26,13 +26,15 @@ import akka.dispatch.MailboxType;
 import akka.dispatch.MessageQueue;
 import akka.dispatch.ProducesMessageQueue;
 import com.typesafe.config.Config;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalWorkerProgram;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import scala.Option;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 
 /**
@@ -40,78 +42,77 @@ import java.util.Queue;
  */
 public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
 
+    private final List<Class> messagePriorities = new ArrayList<>();
+
+
     public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics {
-        private final Queue<Envelope> otherMessages = new LinkedList<>();
-        private final TraverserSet<?> traverserMessages = new TraverserSet<>();
-        private Envelope haltMessage = null;
-        private Envelope terminateToken = null;
-        private final ActorRef owner;
+        private final List<Queue> messages;
+        private final Map<Class, Queue> priorities;
         private final Object MUTEX = new Object();
 
-        public TraverserMessageQueue(final ActorRef owner) {
-            this.owner = owner;
+        public TraverserMessageQueue(final List<Class> messagePriorities) {
+            this.messages = new ArrayList<>(messagePriorities.size());
+            this.priorities = new HashMap<>(messagePriorities.size());
+            for (final Class clazz : messagePriorities) {
+                final Queue queue;
+                if (Traverser.class.isAssignableFrom(clazz))
+                    queue = new TraverserSet<>();
+                else
+                    queue = new LinkedList<>();
+                this.messages.add(queue);
+                this.priorities.put(clazz, queue);
+            }
         }
 
         public void enqueue(final ActorRef receiver, final Envelope handle) {
             synchronized (MUTEX) {
-                if (handle.message() instanceof Traverser.Admin)
-                    this.traverserMessages.offer((Traverser.Admin) handle.message());
-                else if (handle.message() instanceof VoteToHaltMessage) {
-                    assert null == this.haltMessage;
-                    this.haltMessage = handle;
-                } else if (handle.message() instanceof TraversalWorkerProgram.Terminate) {
-                    assert null == this.terminateToken;
-                    this.terminateToken = handle;
-                } else
-                    this.otherMessages.offer(handle);
+                final Queue queue = this.priorities.get(Traverser.class.isAssignableFrom(handle.message().getClass()) ? Traverser.class : handle.message().getClass());
+                if (null == queue)
+                    throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
+                else
+                    queue.offer(handle.message() instanceof Traverser ? handle.message() : handle);
             }
         }
 
         public Envelope dequeue() {
             synchronized (MUTEX) {
-                if (!this.otherMessages.isEmpty())
-                    return this.otherMessages.poll();
-                if (!this.traverserMessages.isEmpty())
-                    return new Envelope(this.traverserMessages.poll(), this.owner);
-                else if (null != this.terminateToken) {
-                    final Envelope temp = this.terminateToken;
-                    this.terminateToken = null;
-                    return temp;
-                } else {
-                    final Envelope temp = this.haltMessage;
-                    this.haltMessage = null;
-                    return temp;
+                for (final Queue queue : this.messages) {
+                    if (!queue.isEmpty()) {
+                        final Object m = queue.poll();
+                        return m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m;
+                    }
                 }
+                return null;
             }
         }
 
         public int numberOfMessages() {
             synchronized (MUTEX) {
-                return this.otherMessages.size() + this.traverserMessages.size() + (null == this.haltMessage ? 0 : 1) + (null == this.terminateToken ? 0 : 1);
+                int counter = 0;
+                for (final Queue queue : this.messages) {
+                    counter = counter + queue.size();
+                }
+                return counter;
             }
         }
 
         public boolean hasMessages() {
             synchronized (MUTEX) {
-                return !this.otherMessages.isEmpty() || !this.traverserMessages.isEmpty() || null != this.haltMessage || this.terminateToken != null;
+                for (final Queue queue : this.messages) {
+                    if (!queue.isEmpty())
+                        return true;
+                }
+                return false;
             }
         }
 
         public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
             synchronized (MUTEX) {
-                for (final Envelope handle : this.otherMessages) {
-                    deadLetters.enqueue(owner, handle);
-                }
-                for (final Traverser.Admin<?> traverser : this.traverserMessages) {
-                    deadLetters.enqueue(owner, new Envelope(traverser, this.owner));
-                }
-                if (null != this.haltMessage) {
-                    deadLetters.enqueue(owner, this.haltMessage);
-                    this.haltMessage = null;
-                }
-                if (null != this.terminateToken) {
-                    deadLetters.enqueue(owner, this.terminateToken);
-                    this.terminateToken = null;
+                for (final Queue queue : this.messages) {
+                    while (!queue.isEmpty()) {
+                        final Object m = queue.poll();
+                        deadLetters.enqueue(owner, m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m);
+                    }
                 }
             }
         }
@@ -119,12 +120,19 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
 
     // This constructor signature must exist, it will be called by Akka
     public TraverserMailbox(final ActorSystem.Settings settings, final Config config) {
-        // put your initialization code here
+        try {
+            final String[] messages = ((String) settings.config().getAnyRef("message-priorities")).replace("[", "").replace("]", "").split(",");
+            for (final String clazz : messages) {
+                this.messagePriorities.add(Class.forName(clazz.trim()));
+            }
+        } catch (final ClassNotFoundException e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
     }
 
     // The create method is called to create the MessageQueue
     public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem> system) {
-        return new TraverserMessageQueue(owner.isEmpty() ? ActorRef.noSender() : owner.get());
+        return new TraverserMessageQueue(this.messagePriorities);
     }
 
     public static interface TraverserSetSemantics {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
index 84dbe37..c023312 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -53,7 +53,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
         for (final Partition partition : partitioner.getPartitions()) {
             this.workers.add(new Address.Worker("../worker-" + partition.hashCode()));
         }
-        ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
+        final ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
         receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
         workerProgram.setup();
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index 706e3a6..a9bda12 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -6,6 +6,10 @@ akka.actor.mailbox.requirements {
   "org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox$TraverserSetSemantics" = custom-dispatcher-mailbox
 }
 
+akka {
+  log-dead-letters-during-shutdown = "false"
+}
+
 custom-dispatcher-mailbox {
   mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
index 2552883..ed627de 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
@@ -32,6 +32,8 @@ public interface Actor {
 
     public <M> void send(final Address toActor, final M message);
 
+
+
     public interface Master extends Actor {
 
         public List<Address.Worker> workers();
@@ -40,6 +42,7 @@ public interface Actor {
 
         public void close();
 
+
     }
 
     public interface Worker extends Actor {
@@ -51,6 +54,7 @@ public interface Actor {
         public List<Address.Worker> workers();
 
         public Partition partition();
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
index b8f7ac1..3ae54d1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
@@ -19,6 +19,8 @@
 
 package org.apache.tinkerpop.gremlin.process.actor;
 
+import java.util.List;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -28,8 +30,11 @@ public interface ActorProgram<M> {
 
     public Master createMasterProgram(final Actor.Master master);
 
+    public List<Class> getMessagePriorities();
+
     public M getResult();
 
+
     public static interface Worker<M> {
         public void setup();
 
@@ -45,6 +50,7 @@ public interface ActorProgram<M> {
         public void execute(final M message);
 
         public void terminate();
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
index e72b989..88eb670 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
@@ -21,20 +21,41 @@ package org.apache.tinkerpop.gremlin.process.actor.traversal;
 
 import org.apache.tinkerpop.gremlin.process.actor.Actor;
 import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet<R>> {
 
+    private static final List<Class> MESSAGE_PRIORITIES = Arrays.asList(
+            StartMessage.class,
+            Traverser.class,
+            SideEffectAddMessage.class,
+            BarrierAddMessage.class,
+            SideEffectSetMessage.class,
+            BarrierDoneMessage.class,
+            Terminate.class,
+            VoteToHaltMessage.class);
+
     private final Traversal.Admin<?, R> traversal;
     private final Partitioner partitioner;
     public TraverserSet<R> result = new TraverserSet<>();
@@ -60,6 +81,12 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet
     }
 
     @Override
+    public List<Class> getMessagePriorities() {
+        return MESSAGE_PRIORITIES;
+    }
+
+
+    @Override
     public TraverserSet<R> getResult() {
         return this.result;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
index ba051e2..45fb6b9 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -39,13 +39,15 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
+final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
 
     private final Actor.Master master;
     private final Traversal.Admin<?, ?> traversal;
@@ -144,6 +146,4 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
         else
             this.master.send(this.master.address(), traverser);
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
index 4275caa..898e191 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMe
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -49,12 +50,8 @@ import java.util.Map;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
+final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
 
-    // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm)
-    public enum Terminate {
-        MAYBE, YES, NO
-    }
 
     private final Actor.Worker self;
     private final TraversalMatrix<?, ?> matrix;
@@ -186,5 +183,4 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
         else
             this.self.send(this.self.address(), traverser);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
index 6ab66c4..0950435 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
@@ -73,6 +73,7 @@ public final class WorkerTraversalSideEffects implements TraversalSideEffects {
 
     @Override
     public void add(final String key, final Object value) {
+        this.sideEffects.add(key, value);
         this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value));
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a7e7a0fe/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
new file mode 100644
index 0000000..4ab789d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
@@ -0,0 +1,28 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public enum Terminate {
+
+    MAYBE, YES, NO
+}