You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/09 01:36:45 UTC
tinkerpop git commit: getting closer. so barriers are all solid,
the problem is when a worker runs out of messages and votes to halt
early and then gets a traverser and then unvotes to halt but master has
already committed to shutting down.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 4a2a870db -> b8a226fae
getting closer. so barriers are all solid, the problem is when a worker runs out of messages and votes to halt early and then gets a traverser and then unvotes to halt but master has already committed to shutting down.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b8a226fa
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b8a226fa
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b8a226fa
Branch: refs/heads/TINKERPOP-1564
Commit: b8a226fae7407f1a1dbc046204ae9582169e458e
Parents: 4a2a870
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Dec 8 18:36:39 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Dec 8 18:36:39 2016 -0700
----------------------------------------------------------------------
.../process/akka/MasterTraversalActor.java | 39 ++++++++++++++------
.../process/akka/TinkerActorSystem.java | 10 ++---
.../process/akka/WorkerTraversalActor.java | 27 ++++++++------
3 files changed, 47 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b8a226fa/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 2899c28..41dff8f 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -81,21 +81,28 @@ public final class MasterTraversalActor extends AbstractActor implements Require
receive(ReceiveBuilder.
match(Traverser.Admin.class, traverser -> {
+ this.firstHalt = true;
+ this.haltSynchronization.remove(sender().path());
this.processTraverser(traverser);
}).
match(BarrierAddMessage.class, barrierMerge -> {
+ this.firstHalt = true;
+ this.haltSynchronization.remove(sender().path());
final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId());
assert null == this.barrierLock || this.barrierLock == barrier;
this.barrierLock = barrier;
this.barrierLock.addBarrier(barrierMerge.getBarrier());
- this.haltSynchronization.remove(sender().path());
+
}).
match(SideEffectAddMessage.class, sideEffect -> {
+ this.firstHalt = true;
+ this.haltSynchronization.remove(sender().path());
this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
- //this.haltSynchronization.remove(sender().path());
}).
match(VoteToContinueMessage.class, voteToContinue -> {
+ this.firstHalt = true;
this.haltSynchronization.remove(sender().path());
+ sender().tell(VoteToContinueMessage.instance(), self());
}).
match(VoteToHaltMessage.class, voteToHalt -> {
assert !sender().equals(self());
@@ -103,17 +110,25 @@ public final class MasterTraversalActor extends AbstractActor implements Require
// when all workers have voted to halt then terminate the system
this.haltSynchronization.add(sender().path());
if (this.haltSynchronization.size() == this.workers.size()) {
- if (null != this.barrierLock) {
- final Step<?, ?> step = (Step) this.barrierLock;
- while (step.hasNext()) {
- this.sendTraverser(step.next());
- }
- // broadcast to all workers that the barrier is unlocked
- this.broadcast(new BarrierDoneMessage(this.barrierLock));
- this.barrierLock = null;
+ if (this.firstHalt) {
+ // a double vote mechanism is in play to ensure the all agents are fully complete
+ this.firstHalt = false;
this.haltSynchronization.clear();
- } else
- context().system().terminate();
+ this.broadcast(VoteToHaltMessage.instance());
+ } else {
+ if (null != this.barrierLock) {
+ final Step<?, ?> step = (Step) this.barrierLock;
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
+ // broadcast to all workers that the barrier is unlocked
+ this.broadcast(new BarrierDoneMessage(this.barrierLock));
+ this.barrierLock = null;
+ this.firstHalt = true;
+ this.haltSynchronization.clear();
+ } else
+ context().system().terminate();
+ }
}
}).build());
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b8a226fa/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index 96b8734..2cf88a7 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -63,7 +63,7 @@ public final class TinkerActorSystem {
//////////////
public static void main(String args[]) throws Exception {
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 10000; i++) {
final Graph graph = TinkerGraph.open();
graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
final GraphTraversalSource g = graph.traversal().withComputer();
@@ -74,9 +74,9 @@ public final class TinkerActorSystem {
as("b").in("created").as("c"),
as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin()),
// side-effects work
- Pair.with(1, g.V().repeat(both()).times(2).
+ Pair.with(3, g.V().repeat(both()).times(2).
groupCount("a").by("name").
- cap("a").asAdmin()),
+ cap("a").unfold().order().by(Column.values, Order.decr).limit(3).asAdmin()),
// barriers work and beyond the local star graph works
Pair.with(1, g.V().repeat(both()).times(2).hasLabel("person").
group().
@@ -94,8 +94,8 @@ public final class TinkerActorSystem {
}
System.out.println(IteratorUtils.asList(actors.getResults()));
- //if(count != 6 && IteratorUtils.count(actors.getResults()) != count)
- // throw new IllegalStateException();
+ if(IteratorUtils.count(actors.getResults()) != count)
+ throw new IllegalStateException();
System.out.println("//////////////////////////////////\n");
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b8a226fa/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 5dc1931..60ae1c6 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -74,21 +74,22 @@ public final class WorkerTraversalActor extends AbstractActor implements
}
// internal vote to have in mailbox as final message to process
self().tell(VoteToHaltMessage.instance(), self());
- this.voteToHalt = false;
}).
match(Traverser.Admin.class, traverser -> {
+ // internal vote to have in mailbox as final message to process
if (this.voteToHalt) {
- // tell master you no longer want to halt
- master().tell(VoteToContinueMessage.instance(), self());
- // internal vote to have in mailbox as final message to process
- self().tell(VoteToHaltMessage.instance(), self());
this.voteToHalt = false;
+ master().tell(VoteToContinueMessage.instance(), self());
}
this.processTraverser(traverser);
}).
match(SideEffectAddMessage.class, sideEffect -> {
// TODO
}).
+ match(VoteToContinueMessage.class, voteToContinueMessage -> {
+ this.voteToHalt = false;
+ self().tell(VoteToHaltMessage.instance(), self());
+ }).
match(BarrierDoneMessage.class, barrierSync -> {
// barrier is complete and processing can continue
if (null != this.barrierLock) {
@@ -96,20 +97,22 @@ public final class WorkerTraversalActor extends AbstractActor implements
this.barrierLock = null;
}
// internal vote to have in mailbox as final message to process
- self().tell(VoteToHaltMessage.instance(), self());
- this.voteToHalt = false;
+ if (this.voteToHalt) {
+ this.voteToHalt = false;
+ master().tell(VoteToContinueMessage.instance(), self());
+ }
}).
match(VoteToHaltMessage.class, haltSync -> {
- assert sender().equals(self());
- boolean hasBarrier = null != this.barrierLock && this.barrierLock.hasNextBarrier();
+ if (sender().equals(master()))
+ this.voteToHalt = false;
// if there is a barrier and thus, halting at barrier, then process barrier
+ boolean hasBarrier = null != this.barrierLock && this.barrierLock.hasNextBarrier();
if (hasBarrier) {
while (this.barrierLock.hasNextBarrier()) {
master().tell(new BarrierAddMessage(this.barrierLock), self());
}
- self().tell(VoteToHaltMessage.instance(), self());
- this.voteToHalt = false;
- } else if (!this.voteToHalt) {
+ }
+ if (!this.voteToHalt) {
// the final message in the worker mail box, tell master you are done processing messages
master().tell(VoteToHaltMessage.instance(), self());
this.voteToHalt = true;