You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/08 17:52:41 UTC
tinkerpop git commit: lots of stuff learned. I had a bug in my
barrier locking. have a better model now where locks are released at
VoteToHalt. Cleaned up code -- reduced complexity. Better message names to
line up with process API semantics.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 3063b99b7 -> 8ef0897f9
lots of stuff learned. I had a bug in my barrier locking. have a better model now where locks are released at VoteToHalt. Cleaned up code -- reduced complexity. Better message names to line up with process API semantics.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8ef0897f
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8ef0897f
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8ef0897f
Branch: refs/heads/TINKERPOP-1564
Commit: 8ef0897f9565f5cb64f5e48506ed4c54605a6021
Parents: 3063b99
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Dec 8 10:52:36 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Dec 8 10:52:36 2016 -0700
----------------------------------------------------------------------
.../process/traversal/util/TraversalMatrix.java | 5 +
.../akka/DistributedTraversalSideEffects.java | 4 +-
.../process/akka/MasterTraversalActor.java | 109 +++++++++----------
.../process/akka/TinkerActorSystem.java | 13 ++-
.../process/akka/TraverserMailbox.java | 12 +-
.../process/akka/WorkerTraversalActor.java | 104 ++++++++++--------
.../akka/messages/BarrierAddMessage.java | 47 ++++++++
.../akka/messages/BarrierDoneMessage.java | 40 +++++++
.../akka/messages/BarrierMergeMessage.java | 47 --------
.../messages/BarrierSynchronizationMessage.java | 45 --------
.../messages/HaltSynchronizationMessage.java | 36 ------
.../akka/messages/SideEffectAddMessage.java | 42 +++++++
.../akka/messages/SideEffectMergeMessage.java | 42 -------
.../process/akka/messages/StartMessage.java | 32 ++++++
.../messages/StartSynchronizationMessage.java | 32 ------
.../akka/messages/VoteToHaltMessage.java | 36 ++++++
16 files changed, 332 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java
index 5b0816d..843a719 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalMatrix.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
/**
* A TraversalMatrix provides random, non-linear access to the steps of a traversal by their step id.
@@ -45,6 +46,10 @@ public final class TraversalMatrix<S, E> {
return (C) this.matrix.get(stepId);
}
+ public Set<String> getStepIds() {
+ return this.matrix.keySet();
+ }
+
public Traversal.Admin<S, E> getTraversal() {
return this.traversal;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
index 8b29a08..61bff7c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
@@ -21,7 +21,7 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.ActorContext;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
import java.util.Optional;
import java.util.Set;
@@ -73,7 +73,7 @@ public final class DistributedTraversalSideEffects implements TraversalSideEffec
@Override
public void add(final String key, final Object value) {
- this.context.parent().tell(new SideEffectMergeMessage(key, value), this.context.self());
+ this.context.parent().tell(new SideEffectAddMessage(key, value), this.context.self());
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 94d9c27..20d770a 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -38,17 +38,15 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMergeMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
/**
@@ -60,7 +58,8 @@ public final class MasterTraversalActor extends AbstractActor implements Require
private final TraversalMatrix<?, ?> matrix;
private final Partitioner partitioner;
private List<ActorSelection> workers;
- private final Map<String, Set<ActorPath>> synchronizationLocks = new HashMap<>();
+ private final Set<ActorPath> haltSynchronization = new HashSet<>();
+ private Barrier barrierLock = null;
public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
System.out.println("master[created]: " + self().path());
@@ -78,44 +77,37 @@ public final class MasterTraversalActor extends AbstractActor implements Require
match(Traverser.Admin.class, traverser -> {
this.processTraverser(traverser);
}).
- match(BarrierMergeMessage.class, barrier -> {
- final Barrier barrierStep = ((Barrier) this.matrix.getStepById(barrier.getStepId()));
- barrierStep.addBarrier(barrier.getBarrier());
- broadcast(new BarrierSynchronizationMessage(barrierStep, true));
+ match(BarrierAddMessage.class, barrierMerge -> {
+ final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId());
+ assert null == this.barrierLock || this.barrierLock == barrier;
+ this.barrierLock = barrier;
+ this.barrierLock.addBarrier(barrierMerge.getBarrier());
}).
- match(BarrierSynchronizationMessage.class, barrierSync -> {
- Set<ActorPath> locks = this.synchronizationLocks.get(barrierSync.getStepId());
- if (null == locks) {
- locks = new HashSet<>();
- this.synchronizationLocks.put(barrierSync.getStepId(), locks);
- }
- locks.add(sender().path());
- if (locks.size() == this.workers.size()) {
- final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(barrierSync.getStepId());
- this.broadcast(new BarrierSynchronizationMessage((Barrier) step, false));
- while (step.hasNext()) {
- this.processTraverser(step.next());
- }
- }
- }).
- match(SideEffectMergeMessage.class, sideEffect -> {
- this.traversal.getSideEffects().add(sideEffect.getKey(), sideEffect.getValue());
- //this.broadcast(new SideEffectMergeMessage(sideEffect.getKey(), sideEffect.getValue()));
+ match(SideEffectAddMessage.class, sideEffect -> {
+ this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
+ //this.broadcast(new SideEffectAddMessage(sideEffect.getKey(), sideEffect.getValue()));
}).
- match(HaltSynchronizationMessage.class, haltSync -> {
- Set<ActorPath> locks = this.synchronizationLocks.get(Traverser.Admin.HALT);
- if (null == locks) {
- locks = new HashSet<>();
- this.synchronizationLocks.put(Traverser.Admin.HALT, locks);
- }
- if (haltSync.isHalt()) {
- locks.add(sender().path());
- if (locks.size() == this.workers.size())
- context().system().terminate();
- } else {
- locks.remove(sender().path());
- this.broadcast(new HaltSynchronizationMessage(true));
- }
+ match(VoteToHaltMessage.class, haltSync -> {
+ // receive vote to halt messages from worker
+ // when all workers have voted to halt then terminate the system
+ if (haltSync.voteToHalt()) {
+ this.haltSynchronization.add(sender().path());
+ if (this.haltSynchronization.size() == this.workers.size()) {
+ if (null != this.barrierLock) {
+ final Step<?, ?> step = (Step) this.barrierLock;
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
+ // broadcast to all workers that the barrier is unlocked
+ this.broadcast(new BarrierDoneMessage(this.barrierLock));
+ this.barrierLock = null;
+ this.haltSynchronization.clear();
+ } else
+ context().system().terminate();
+ }
+ } else
+ this.haltSynchronization.remove(sender().path());
+
}).build());
}
@@ -127,7 +119,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require
this.workers.add(context().actorSelection(worker.path()));
}
for (final ActorSelection worker : this.workers) {
- worker.tell(StartSynchronizationMessage.instance(), self());
+ worker.tell(StartMessage.instance(), self());
}
}
@@ -138,20 +130,25 @@ public final class MasterTraversalActor extends AbstractActor implements Require
}
private void processTraverser(final Traverser.Admin traverser) {
+ if (traverser.isHalted() || traverser.get() instanceof Element) {
+ this.sendTraverser(traverser);
+ } else {
+ final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+ step.addStart(traverser);
+ while (step.hasNext()) {
+ this.processTraverser(step.next());
+ }
+ }
+ }
+
+ private void sendTraverser(final Traverser.Admin traverser) {
if (traverser.isHalted()) {
System.out.println("master[result]: " + traverser);
- broadcast(new HaltSynchronizationMessage(true));
+ } else if (traverser.get() instanceof Element) {
+ final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
+ context().actorSelection("worker-" + otherPartition.hashCode()).tell(traverser, self());
} else {
- if (traverser.get() instanceof Element) {
- final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
- context().actorSelection("worker-" + otherPartition.hashCode()).tell(traverser, self());
- } else {
- final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
- step.addStart(traverser);
- while (step.hasNext()) {
- this.processTraverser(step.next());
- }
- }
+ self().tell(traverser, self());
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index b60f267..6f83920 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -65,19 +66,23 @@ public final class TinkerActorSystem {
g.V().repeat(both()).times(2).
groupCount("a").by("name").
cap("a").
- select(Column.keys).unfold().limit(3).asAdmin(),
- // barries work and beyond the local star graph works
+ unfold().
+ order().by(Column.values, Order.decr).limit(3).asAdmin(),
+ // barriers work and beyond the local star graph works
g.V().repeat(both()).times(2).hasLabel("person").
group().
by("name").
- by(out("created").values("name").dedup().fold()).asAdmin()
+ by(out("created").values("name").dedup().fold()).asAdmin(),
+ // no results works
+ g.V().out("blah").asAdmin()
);
for (final Traversal.Admin<?, ?> traversal : traversals) {
System.out.println("EXECUTING: " + traversal.getBytecode());
- final TinkerActorSystem actors = new TinkerActorSystem(traversal);
+ final TinkerActorSystem actors = new TinkerActorSystem(traversal.clone());
while (!actors.system.isTerminated()) {
}
+ //System.out.println(traversal.toList());
System.out.println("//////////////////////////////////\n");
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index e0f84dc..f9343e6 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -28,9 +28,9 @@ import akka.dispatch.ProducesMessageQueue;
import com.typesafe.config.Config;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
import scala.Option;
import java.util.Queue;
@@ -57,11 +57,11 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
public void enqueue(final ActorRef receiver, final Envelope handle) {
if (handle.message() instanceof Traverser.Admin)
this.traverserSet.offer((Traverser.Admin) handle.message());
- else if (handle.message() instanceof SideEffectMergeMessage)
+ else if (handle.message() instanceof SideEffectAddMessage)
this.queue.offer(handle);
- else if (handle.message() instanceof BarrierSynchronizationMessage)
+ else if (handle.message() instanceof BarrierDoneMessage)
this.barrierSyncs.offer(handle);
- else if (handle.message() instanceof HaltSynchronizationMessage)
+ else if (handle.message() instanceof VoteToHaltMessage)
this.haltSyncs.offer(handle);
else
this.queue.offer(handle);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 20475ef..f280678 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -20,6 +20,7 @@
package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.dispatch.RequiresMessageQueue;
import akka.japi.pf.ReceiveBuilder;
@@ -27,17 +28,16 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMergeMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
import java.util.HashMap;
import java.util.Map;
@@ -48,12 +48,14 @@ import java.util.Map;
public final class WorkerTraversalActor extends AbstractActor implements
RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
- private final TraversalMatrix<?, ?> matrix;
+ private TraversalMatrix<?, ?> matrix = null;
private final Partition localPartition;
private final Partitioner partitioner;
- private boolean sentHaltMessage = false;
+ private boolean voteToHalt = false;
private final Map<String, ActorSelection> workers = new HashMap<>();
+ private Barrier barrierLock = null;
+
public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
System.out.println("worker[created]: " + self().path());
this.matrix = new TraversalMatrix<>(traversal);
@@ -63,40 +65,68 @@ public final class WorkerTraversalActor extends AbstractActor implements
((GraphStep) traversal.getStartStep()).setIteratorSupplier(localPartition::vertices);
receive(ReceiveBuilder.
- match(StartSynchronizationMessage.class, start -> {
+ match(StartMessage.class, start -> {
+ // initial message from master that says: "start processing"
final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
while (step.hasNext()) {
- this.processTraverser(step.next());
+ this.sendTraverser(step.next());
}
+ // internal vote to have in mailbox as final message to process
+ self().tell(new VoteToHaltMessage(true), self());
}).
- match(BarrierSynchronizationMessage.class, barrierSync -> {
- final Barrier barrier = this.matrix.getStepById(barrierSync.getStepId());
- if (barrierSync.getLock()) {
- this.processBarrier(barrier);
- } else {
- barrier.done();
+ match(Traverser.Admin.class, traverser -> {
+ if (this.voteToHalt) {
+ // tell master you no longer want to halt
+ master().tell(new VoteToHaltMessage(false), self());
+ // internal vote to have in mailbox as final message to process
+ self().tell(new VoteToHaltMessage(true), self());
+ this.voteToHalt = false;
}
+ this.processTraverser(traverser);
}).
- match(SideEffectMergeMessage.class, sideEffect -> {
+ match(SideEffectAddMessage.class, sideEffect -> {
// TODO: sideEffect.setSideEffect(this.matrix.getTraversal());
}).
- match(HaltSynchronizationMessage.class, haltSync -> {
- sender().tell(new HaltSynchronizationMessage(true), self());
- this.sentHaltMessage = true;
+ match(BarrierDoneMessage.class, barrierSync -> {
+ // barrier is complete and processing can continue
+ if (null != this.barrierLock) {
+ this.barrierLock.done();
+ this.barrierLock = null;
+ // internal vote to have in mailbox as final message to process
+ self().tell(new VoteToHaltMessage(true), self());
+ }
}).
- match(Traverser.Admin.class, traverser -> {
- if (this.sentHaltMessage) {
- context().parent().tell(new HaltSynchronizationMessage(false), self());
- this.sentHaltMessage = false;
+ match(VoteToHaltMessage.class, haltSync -> {
+ // if there is a barrier and thus, halting at barrier, then process barrier
+ if (null != this.barrierLock) {
+ while (this.barrierLock.hasNextBarrier()) {
+ master().tell(new BarrierAddMessage(this.barrierLock), self());
+ }
}
- this.processTraverser(traverser);
+ // the final message in the worker mail box, tell master you are done processing messages
+ master().tell(new VoteToHaltMessage(true), self());
+ this.voteToHalt = true;
}).build()
);
}
private void processTraverser(final Traverser.Admin traverser) {
+ assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
+ final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+ step.addStart(traverser);
+ if (step instanceof Barrier) {
+ assert null == this.barrierLock || step == this.barrierLock;
+ this.barrierLock = (Barrier) step;
+ } else {
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
+ }
+ }
+
+ private void sendTraverser(final Traverser.Admin traverser) {
if (traverser.isHalted())
- context().parent().tell(traverser, self());
+ master().tell(traverser, self());
else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) {
final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
final String workerPathString = "../worker-" + otherPartition.hashCode();
@@ -106,25 +136,11 @@ public final class WorkerTraversalActor extends AbstractActor implements
this.workers.put(workerPathString, worker);
}
worker.tell(traverser, self());
- } else {
- final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
- step.addStart(traverser);
- if (step instanceof Barrier) {
- this.processBarrier((Barrier) step);
- } else {
- while (step.hasNext()) {
- this.processTraverser(step.next());
- }
- }
- }
+ } else
+ self().tell(traverser, self());
}
- private void processBarrier(final Barrier barrier) {
- if (barrier instanceof Bypassing)
- ((Bypassing) barrier).setBypass(true);
- while (barrier.hasNextBarrier()) {
- context().parent().tell(new BarrierMergeMessage(barrier), self());
- }
- context().parent().tell(new BarrierSynchronizationMessage(barrier, true), self());
+ private ActorRef master() {
+ return context().parent();
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierAddMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierAddMessage.java
new file mode 100644
index 0000000..4e204f1
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierAddMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierAddMessage {
+
+ private final Object barrier;
+ private final String stepId;
+
+ public BarrierAddMessage(final Barrier barrier) {
+ this.barrier = barrier.nextBarrier();
+ this.stepId = ((Step) barrier).getId();
+ }
+
+ public Object getBarrier() {
+ return this.barrier;
+ }
+
+ public String getStepId() {
+ return this.stepId;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierDoneMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierDoneMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierDoneMessage.java
new file mode 100644
index 0000000..73c0611
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierDoneMessage.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierDoneMessage implements SynchronizationMessage {
+
+ private final String stepId;
+
+ public BarrierDoneMessage(final Barrier barrier) {
+ this.stepId = ((Step) barrier).getId();
+ }
+
+ public String getStepId() {
+ return this.stepId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
deleted file mode 100644
index 55e3e70..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierMergeMessage {
-
- private final Object barrier;
- private final String stepId;
-
- public BarrierMergeMessage(final Barrier barrier) {
- this.barrier = barrier.nextBarrier();
- this.stepId = ((Step) barrier).getId();
- }
-
- public Object getBarrier() {
- return this.barrier;
- }
-
- public String getStepId() {
- return this.stepId;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
deleted file mode 100644
index 467ac28..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierSynchronizationMessage implements SynchronizationMessage {
-
- private final String stepId;
- private final boolean lock;
-
- public BarrierSynchronizationMessage(final Barrier barrier, final boolean lock) {
- this.stepId = ((Step) barrier).getId();
- this.lock = lock;
- }
-
- public String getStepId() {
- return this.stepId;
- }
-
- public boolean getLock() {
- return this.lock;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
deleted file mode 100644
index f3419a0..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HaltSynchronizationMessage implements SynchronizationMessage {
-
- private final boolean halt;
-
- public HaltSynchronizationMessage(final boolean halt) {
- this.halt = halt;
- }
-
- public boolean isHalt() {
- return this.halt;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectAddMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectAddMessage.java
new file mode 100644
index 0000000..f010dbb
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectAddMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectAddMessage {
+
+ private final String sideEffectKey;
+ private final Object sideEffect;
+
+ public SideEffectAddMessage(final String sideEffectKey, final Object sideEffect) {
+ this.sideEffect = sideEffect;
+ this.sideEffectKey = sideEffectKey;
+ }
+
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ public Object getSideEffectValue() {
+ return this.sideEffect;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
deleted file mode 100644
index 1cb6866..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectMergeMessage {
-
- private final String sideEffectKey;
- private final Object sideEffect;
-
- public SideEffectMergeMessage(final String sideEffectKey, final Object sideEffect) {
- this.sideEffect = sideEffect;
- this.sideEffectKey = sideEffectKey;
- }
-
- public String getKey() {
- return this.sideEffectKey;
- }
-
- public Object getValue() {
- return this.sideEffect;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
new file mode 100644
index 0000000..7acb251
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StartMessage {
+
+ private static final StartMessage INSTANCE = new StartMessage();
+
+ public static StartMessage instance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
deleted file mode 100644
index bab03f5..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class StartSynchronizationMessage {
-
- private static final StartSynchronizationMessage INSTANCE = new StartSynchronizationMessage();
-
- public static StartSynchronizationMessage instance() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8ef0897f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
new file mode 100644
index 0000000..ca5929f
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VoteToHaltMessage implements SynchronizationMessage {
+
+ private final boolean voteToHalt;
+
+ public VoteToHaltMessage(final boolean voteToHalt) {
+ this.voteToHalt = voteToHalt;
+ }
+
+ public boolean voteToHalt() {
+ return this.voteToHalt;
+ }
+}