You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/09 01:36:45 UTC

tinkerpop git commit: getting closer. so barriers are all solid, the problem is when a worker runs out of messages and votes to halt early and then gets a traverser and then unvotes to halt but master has already committed to shutting down.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 4a2a870db -> b8a226fae


getting closer. so barriers are all solid, the problem is when a worker runs out of messages and votes to halt early and then gets a traverser and then unvotes to halt but master has already committed to shutting down.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b8a226fa
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b8a226fa
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b8a226fa

Branch: refs/heads/TINKERPOP-1564
Commit: b8a226fae7407f1a1dbc046204ae9582169e458e
Parents: 4a2a870
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Dec 8 18:36:39 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Dec 8 18:36:39 2016 -0700

----------------------------------------------------------------------
 .../process/akka/MasterTraversalActor.java      | 39 ++++++++++++++------
 .../process/akka/TinkerActorSystem.java         | 10 ++---
 .../process/akka/WorkerTraversalActor.java      | 27 ++++++++------
 3 files changed, 47 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b8a226fa/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 2899c28..41dff8f 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -81,21 +81,28 @@ public final class MasterTraversalActor extends AbstractActor implements Require
 
         receive(ReceiveBuilder.
                 match(Traverser.Admin.class, traverser -> {
+                    this.firstHalt = true;
+                    this.haltSynchronization.remove(sender().path());
                     this.processTraverser(traverser);
                 }).
                 match(BarrierAddMessage.class, barrierMerge -> {
+                    this.firstHalt = true;
+                    this.haltSynchronization.remove(sender().path());
                     final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId());
                     assert null == this.barrierLock || this.barrierLock == barrier;
                     this.barrierLock = barrier;
                     this.barrierLock.addBarrier(barrierMerge.getBarrier());
-                    this.haltSynchronization.remove(sender().path());
+
                 }).
                 match(SideEffectAddMessage.class, sideEffect -> {
+                    this.firstHalt = true;
+                    this.haltSynchronization.remove(sender().path());
                     this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
-                    //this.haltSynchronization.remove(sender().path());
                 }).
                 match(VoteToContinueMessage.class, voteToContinue -> {
+                    this.firstHalt = true;
                     this.haltSynchronization.remove(sender().path());
+                    sender().tell(VoteToContinueMessage.instance(), self());
                 }).
                 match(VoteToHaltMessage.class, voteToHalt -> {
                     assert !sender().equals(self());
@@ -103,17 +110,25 @@ public final class MasterTraversalActor extends AbstractActor implements Require
                     // when all workers  have voted to halt then terminate the system
                     this.haltSynchronization.add(sender().path());
                     if (this.haltSynchronization.size() == this.workers.size()) {
-                        if (null != this.barrierLock) {
-                            final Step<?, ?> step = (Step) this.barrierLock;
-                            while (step.hasNext()) {
-                                this.sendTraverser(step.next());
-                            }
-                            // broadcast to all workers that the barrier is unlocked
-                            this.broadcast(new BarrierDoneMessage(this.barrierLock));
-                            this.barrierLock = null;
+                        if (this.firstHalt) {
+                            // a double vote mechanism is in play to ensure the all agents are fully complete
+                            this.firstHalt = false;
                             this.haltSynchronization.clear();
-                        } else
-                            context().system().terminate();
+                            this.broadcast(VoteToHaltMessage.instance());
+                        } else {
+                            if (null != this.barrierLock) {
+                                final Step<?, ?> step = (Step) this.barrierLock;
+                                while (step.hasNext()) {
+                                    this.sendTraverser(step.next());
+                                }
+                                // broadcast to all workers that the barrier is unlocked
+                                this.broadcast(new BarrierDoneMessage(this.barrierLock));
+                                this.barrierLock = null;
+                                this.firstHalt = true;
+                                this.haltSynchronization.clear();
+                            } else
+                                context().system().terminate();
+                        }
                     }
                 }).build());
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b8a226fa/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index 96b8734..2cf88a7 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -63,7 +63,7 @@ public final class TinkerActorSystem {
     //////////////
 
     public static void main(String args[]) throws Exception {
-        for (int i = 0; i < 100; i++) {
+        for (int i = 0; i < 10000; i++) {
             final Graph graph = TinkerGraph.open();
             graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
             final GraphTraversalSource g = graph.traversal().withComputer();
@@ -74,9 +74,9 @@ public final class TinkerActorSystem {
                             as("b").in("created").as("c"),
                             as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin()),
                     // side-effects work
-                    Pair.with(1, g.V().repeat(both()).times(2).
+                    Pair.with(3, g.V().repeat(both()).times(2).
                             groupCount("a").by("name").
-                            cap("a").asAdmin()),
+                            cap("a").unfold().order().by(Column.values, Order.decr).limit(3).asAdmin()),
                     // barriers work and beyond the local star graph works
                     Pair.with(1, g.V().repeat(both()).times(2).hasLabel("person").
                             group().
@@ -94,8 +94,8 @@ public final class TinkerActorSystem {
 
                 }
                 System.out.println(IteratorUtils.asList(actors.getResults()));
-                //if(count != 6 && IteratorUtils.count(actors.getResults()) != count)
-                //   throw new IllegalStateException();
+                if(IteratorUtils.count(actors.getResults()) != count)
+                   throw new IllegalStateException();
                 System.out.println("//////////////////////////////////\n");
             }
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b8a226fa/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 5dc1931..60ae1c6 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -74,21 +74,22 @@ public final class WorkerTraversalActor extends AbstractActor implements
                     }
                     // internal vote to have in mailbox as final message to process
                     self().tell(VoteToHaltMessage.instance(), self());
-                    this.voteToHalt = false;
                 }).
                 match(Traverser.Admin.class, traverser -> {
+                    // internal vote to have in mailbox as final message to process
                     if (this.voteToHalt) {
-                        // tell master you no longer want to halt
-                        master().tell(VoteToContinueMessage.instance(), self());
-                        // internal vote to have in mailbox as final message to process
-                        self().tell(VoteToHaltMessage.instance(), self());
                         this.voteToHalt = false;
+                        master().tell(VoteToContinueMessage.instance(), self());
                     }
                     this.processTraverser(traverser);
                 }).
                 match(SideEffectAddMessage.class, sideEffect -> {
                     // TODO
                 }).
+                match(VoteToContinueMessage.class, voteToContinueMessage -> {
+                    this.voteToHalt = false;
+                    self().tell(VoteToHaltMessage.instance(), self());
+                }).
                 match(BarrierDoneMessage.class, barrierSync -> {
                     // barrier is complete and processing can continue
                     if (null != this.barrierLock) {
@@ -96,20 +97,22 @@ public final class WorkerTraversalActor extends AbstractActor implements
                         this.barrierLock = null;
                     }
                     // internal vote to have in mailbox as final message to process
-                    self().tell(VoteToHaltMessage.instance(), self());
-                    this.voteToHalt = false;
+                    if (this.voteToHalt) {
+                        this.voteToHalt = false;
+                        master().tell(VoteToContinueMessage.instance(), self());
+                    }
                 }).
                 match(VoteToHaltMessage.class, haltSync -> {
-                    assert sender().equals(self());
-                    boolean hasBarrier = null != this.barrierLock && this.barrierLock.hasNextBarrier();
+                    if (sender().equals(master()))
+                        this.voteToHalt = false;
                     // if there is a barrier and thus, halting at barrier, then process barrier
+                    boolean hasBarrier = null != this.barrierLock && this.barrierLock.hasNextBarrier();
                     if (hasBarrier) {
                         while (this.barrierLock.hasNextBarrier()) {
                             master().tell(new BarrierAddMessage(this.barrierLock), self());
                         }
-                        self().tell(VoteToHaltMessage.instance(), self());
-                        this.voteToHalt = false;
-                    } else if (!this.voteToHalt) {
+                    }
+                    if (!this.voteToHalt) {
                         // the final message in the worker mail box, tell master you are done processing messages
                         master().tell(VoteToHaltMessage.instance(), self());
                         this.voteToHalt = true;