You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/13 19:58:59 UTC
tinkerpop git commit: okay. mailboxes are controlled by
ActorProgram.getMessagePriorities(). Took me forever to figure out how to
dynamically configure akka mailboxes.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 23c48cf2f -> 4dd256a38
okay. mailboxes are controlled by ActorProgram.getMessagePriorities(). Took me forever to figure out how to dynamically configure akka mailboxes.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/4dd256a3
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/4dd256a3
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/4dd256a3
Branch: refs/heads/TINKERPOP-1564
Commit: 4dd256a38b6f321c0b5d69f8797c6ecf028a285a
Parents: 23c48cf
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Dec 13 12:58:54 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Dec 13 12:58:54 2016 -0700
----------------------------------------------------------------------
.../gremlin/akka/process/actor/AkkaActors.java | 9 +-
.../gremlin/akka/process/actor/MasterActor.java | 1 +
.../akka/process/actor/TraverserMailbox.java | 104 ++++++++++---------
.../gremlin/akka/process/actor/WorkerActor.java | 2 +-
.../src/main/resources/application.conf | 4 +
.../tinkerpop/gremlin/process/actor/Actor.java | 4 +
.../gremlin/process/actor/ActorProgram.java | 6 ++
.../actor/traversal/TraversalActorProgram.java | 27 +++++
.../actor/traversal/TraversalMasterProgram.java | 6 +-
.../actor/traversal/TraversalWorkerProgram.java | 8 +-
.../traversal/WorkerTraversalSideEffects.java | 1 +
.../actor/traversal/message/Terminate.java | 28 +++++
12 files changed, 141 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
index de301c1..5faa8d6 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
@@ -21,6 +21,9 @@ package org.apache.tinkerpop.gremlin.akka.process.actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actor.Actors;
import org.apache.tinkerpop.gremlin.process.actor.Address;
@@ -28,6 +31,7 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -40,7 +44,10 @@ public final class AkkaActors<R> implements Actors<R> {
public AkkaActors(final ActorProgram<R> actorProgram, final Partitioner partitioner) {
this.actorProgram = actorProgram;
- this.system = ActorSystem.create("traversal-" + actorProgram.hashCode());
+ final Config config = ConfigFactory.defaultApplication().
+ withValue("message-priorities",
+ ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString()));
+ this.system = ActorSystem.create("traversal-" + actorProgram.hashCode(), config);
this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner), "master").path().toString());
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
index d7b45fa..8d61492 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -81,4 +81,5 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
public void close() {
context().system().terminate();
}
+
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
index 671f3a2..8251ab4 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
@@ -26,13 +26,15 @@ import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import com.typesafe.config.Config;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalWorkerProgram;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import scala.Option;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Queue;
/**
@@ -40,78 +42,77 @@ import java.util.Queue;
*/
public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
+ private final List<Class> messagePriorities = new ArrayList<>();
+
+
public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics {
- private final Queue<Envelope> otherMessages = new LinkedList<>();
- private final TraverserSet<?> traverserMessages = new TraverserSet<>();
- private Envelope haltMessage = null;
- private Envelope terminateToken = null;
- private final ActorRef owner;
+ private final List<Queue> messages;
+ private final Map<Class, Queue> priorities;
private final Object MUTEX = new Object();
- public TraverserMessageQueue(final ActorRef owner) {
- this.owner = owner;
+ public TraverserMessageQueue(final List<Class> messagePriorities) {
+ this.messages = new ArrayList<>(messagePriorities.size());
+ this.priorities = new HashMap<>(messagePriorities.size());
+ for (final Class clazz : messagePriorities) {
+ final Queue queue;
+ if (Traverser.class.isAssignableFrom(clazz))
+ queue = new TraverserSet<>();
+ else
+ queue = new LinkedList<>();
+ this.messages.add(queue);
+ this.priorities.put(clazz, queue);
+ }
}
public void enqueue(final ActorRef receiver, final Envelope handle) {
synchronized (MUTEX) {
- if (handle.message() instanceof Traverser.Admin)
- this.traverserMessages.offer((Traverser.Admin) handle.message());
- else if (handle.message() instanceof VoteToHaltMessage) {
- assert null == this.haltMessage;
- this.haltMessage = handle;
- } else if (handle.message() instanceof TraversalWorkerProgram.Terminate) {
- assert null == this.terminateToken;
- this.terminateToken = handle;
- } else
- this.otherMessages.offer(handle);
+ final Queue queue = this.priorities.get(Traverser.class.isAssignableFrom(handle.message().getClass()) ? Traverser.class : handle.message().getClass());
+ if (null == queue)
+ throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
+ else
+ queue.offer(handle.message() instanceof Traverser ? handle.message() : handle);
}
}
public Envelope dequeue() {
synchronized (MUTEX) {
- if (!this.otherMessages.isEmpty())
- return this.otherMessages.poll();
- if (!this.traverserMessages.isEmpty())
- return new Envelope(this.traverserMessages.poll(), this.owner);
- else if (null != this.terminateToken) {
- final Envelope temp = this.terminateToken;
- this.terminateToken = null;
- return temp;
- } else {
- final Envelope temp = this.haltMessage;
- this.haltMessage = null;
- return temp;
+ for (final Queue queue : this.messages) {
+ if (!queue.isEmpty()) {
+ final Object m = queue.poll();
+ return m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m;
+ }
}
+ return null;
}
}
public int numberOfMessages() {
synchronized (MUTEX) {
- return this.otherMessages.size() + this.traverserMessages.size() + (null == this.haltMessage ? 0 : 1) + (null == this.terminateToken ? 0 : 1);
+ int counter = 0;
+ for (final Queue queue : this.messages) {
+ counter = counter + queue.size();
+ }
+ return counter;
}
}
public boolean hasMessages() {
synchronized (MUTEX) {
- return !this.otherMessages.isEmpty() || !this.traverserMessages.isEmpty() || null != this.haltMessage || this.terminateToken != null;
+ for (final Queue queue : this.messages) {
+ if (!queue.isEmpty())
+ return true;
+ }
+ return false;
}
}
public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
synchronized (MUTEX) {
- for (final Envelope handle : this.otherMessages) {
- deadLetters.enqueue(owner, handle);
- }
- for (final Traverser.Admin<?> traverser : this.traverserMessages) {
- deadLetters.enqueue(owner, new Envelope(traverser, this.owner));
- }
- if (null != this.haltMessage) {
- deadLetters.enqueue(owner, this.haltMessage);
- this.haltMessage = null;
- }
- if (null != this.terminateToken) {
- deadLetters.enqueue(owner, this.terminateToken);
- this.terminateToken = null;
+ for (final Queue queue : this.messages) {
+ while (!queue.isEmpty()) {
+ final Object m = queue.poll();
+ deadLetters.enqueue(owner, m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m);
+ }
}
}
}
@@ -119,12 +120,19 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
// This constructor signature must exist, it will be called by Akka
public TraverserMailbox(final ActorSystem.Settings settings, final Config config) {
- // put your initialization code here
+ try {
+ final String[] messages = ((String) settings.config().getAnyRef("message-priorities")).replace("[", "").replace("]", "").split(",");
+ for (final String clazz : messages) {
+ this.messagePriorities.add(Class.forName(clazz.trim()));
+ }
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
}
// The create method is called to create the MessageQueue
public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem> system) {
- return new TraverserMessageQueue(owner.isEmpty() ? ActorRef.noSender() : owner.get());
+ return new TraverserMessageQueue(this.messagePriorities);
}
public static interface TraverserSetSemantics {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
index 84dbe37..c023312 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -53,7 +53,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
for (final Partition partition : partitioner.getPartitions()) {
this.workers.add(new Address.Worker("../worker-" + partition.hashCode()));
}
- ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
+ final ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
workerProgram.setup();
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index 706e3a6..a9bda12 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -6,6 +6,10 @@ akka.actor.mailbox.requirements {
"org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox$TraverserSetSemantics" = custom-dispatcher-mailbox
}
+akka {
+ log-dead-letters-during-shutdown = "false"
+}
+
custom-dispatcher-mailbox {
mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actor.TraverserMailbox"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
index 2552883..ed627de 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
@@ -32,6 +32,8 @@ public interface Actor {
public <M> void send(final Address toActor, final M message);
+
+
public interface Master extends Actor {
public List<Address.Worker> workers();
@@ -40,6 +42,7 @@ public interface Actor {
public void close();
+
}
public interface Worker extends Actor {
@@ -51,6 +54,7 @@ public interface Actor {
public List<Address.Worker> workers();
public Partition partition();
+
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
index b8f7ac1..3ae54d1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
@@ -19,6 +19,8 @@
package org.apache.tinkerpop.gremlin.process.actor;
+import java.util.List;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -28,8 +30,11 @@ public interface ActorProgram<M> {
public Master createMasterProgram(final Actor.Master master);
+ public List<Class> getMessagePriorities();
+
public M getResult();
+
public static interface Worker<M> {
public void setup();
@@ -45,6 +50,7 @@ public interface ActorProgram<M> {
public void execute(final M message);
public void terminate();
+
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
index e72b989..88eb670 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
@@ -21,20 +21,41 @@ package org.apache.tinkerpop.gremlin.process.actor.traversal;
import org.apache.tinkerpop.gremlin.process.actor.Actor;
import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import java.util.Arrays;
+import java.util.List;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet<R>> {
+ private static final List<Class> MESSAGE_PRIORITIES = Arrays.asList(
+ StartMessage.class,
+ Traverser.class,
+ SideEffectAddMessage.class,
+ BarrierAddMessage.class,
+ SideEffectSetMessage.class,
+ BarrierDoneMessage.class,
+ Terminate.class,
+ VoteToHaltMessage.class);
+
private final Traversal.Admin<?, R> traversal;
private final Partitioner partitioner;
public TraverserSet<R> result = new TraverserSet<>();
@@ -60,6 +81,12 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet
}
@Override
+ public List<Class> getMessagePriorities() {
+ return MESSAGE_PRIORITIES;
+ }
+
+
+ @Override
public TraverserSet<R> getResult() {
return this.result;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
index ba051e2..45fb6b9 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -39,13 +39,15 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
+final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
private final Actor.Master master;
private final Traversal.Admin<?, ?> traversal;
@@ -144,6 +146,4 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
else
this.master.send(this.master.address(), traverser);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
index 4275caa..898e191 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMe
import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate;
import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -49,12 +50,8 @@ import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
+final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
- // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm)
- public enum Terminate {
- MAYBE, YES, NO
- }
private final Actor.Worker self;
private final TraversalMatrix<?, ?> matrix;
@@ -186,5 +183,4 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
else
this.self.send(this.self.address(), traverser);
}
-
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
index 6ab66c4..0950435 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
@@ -73,6 +73,7 @@ public final class WorkerTraversalSideEffects implements TraversalSideEffects {
@Override
public void add(final String key, final Object value) {
+ this.sideEffects.add(key, value);
this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value));
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/4dd256a3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
new file mode 100644
index 0000000..4ab789d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public enum Terminate {
+
+ MAYBE, YES, NO
+}