You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/09 13:16:47 UTC

tinkerpop git commit: an old college text book taught me the dual-ring termination algorithm for terminating a distributed message passing system when inactive actors can become active again. A termination token (enum) is passed between the workers. Prio

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 b8a226fae -> 9a6be81b5


an old college text book taught me the dual-ring termination algorithm for terminating a distributed message passing system when inactive actors can become active again. A termination token (enum) is passed between the workers. Prior, I was doing bookeeping at the master and it was not a sound algorithm. This algorithm is simpler and allowed me to remove lots of code. After 10,000 iterations of 3 tests, no deadlocks and no wrong answers. Finally -- that was shitty.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/9a6be81b
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/9a6be81b
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/9a6be81b

Branch: refs/heads/TINKERPOP-1564
Commit: 9a6be81b589110feda97cf29441b6d891388681a
Parents: b8a226f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Dec 9 06:16:42 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Dec 9 06:16:42 2016 -0700

----------------------------------------------------------------------
 .../process/akka/MasterTraversalActor.java      | 56 +++----------
 .../process/akka/WorkerTraversalActor.java      | 83 +++++++++++---------
 .../akka/messages/VoteToContinueMessage.java    | 34 --------
 3 files changed, 60 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9a6be81b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 41dff8f..ff72ec0 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -20,7 +20,6 @@
 package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 
 import akka.actor.AbstractActor;
-import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
@@ -43,14 +42,11 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAdd
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToContinueMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -61,10 +57,9 @@ public final class MasterTraversalActor extends AbstractActor implements Require
     private final TraversalMatrix<?, ?> matrix;
     private final Partitioner partitioner;
     private final Map<String, ActorSelection> workers = new HashMap<>();
-    private final Set<ActorPath> haltSynchronization = new HashSet<>();
     private Barrier barrierLock = null;
     private final TraverserSet<?> results;
-    private boolean firstHalt = true;
+    private final String leaderWorker;
 
     public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
         System.out.println("master[created]: " + self().path());
@@ -78,58 +73,32 @@ public final class MasterTraversalActor extends AbstractActor implements Require
         this.partitioner = partitioner;
         this.results = results;
         this.initializeWorkers();
+        this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode();
 
         receive(ReceiveBuilder.
                 match(Traverser.Admin.class, traverser -> {
-                    this.firstHalt = true;
-                    this.haltSynchronization.remove(sender().path());
                     this.processTraverser(traverser);
                 }).
                 match(BarrierAddMessage.class, barrierMerge -> {
-                    this.firstHalt = true;
-                    this.haltSynchronization.remove(sender().path());
                     final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId());
                     assert null == this.barrierLock || this.barrierLock == barrier;
                     this.barrierLock = barrier;
                     this.barrierLock.addBarrier(barrierMerge.getBarrier());
-
                 }).
                 match(SideEffectAddMessage.class, sideEffect -> {
-                    this.firstHalt = true;
-                    this.haltSynchronization.remove(sender().path());
                     this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
                 }).
-                match(VoteToContinueMessage.class, voteToContinue -> {
-                    this.firstHalt = true;
-                    this.haltSynchronization.remove(sender().path());
-                    sender().tell(VoteToContinueMessage.instance(), self());
-                }).
                 match(VoteToHaltMessage.class, voteToHalt -> {
                     assert !sender().equals(self());
-                    // receive vote to halt messages from worker
-                    // when all workers  have voted to halt then terminate the system
-                    this.haltSynchronization.add(sender().path());
-                    if (this.haltSynchronization.size() == this.workers.size()) {
-                        if (this.firstHalt) {
-                            // a double vote mechanism is in play to ensure the all agents are fully complete
-                            this.firstHalt = false;
-                            this.haltSynchronization.clear();
-                            this.broadcast(VoteToHaltMessage.instance());
-                        } else {
-                            if (null != this.barrierLock) {
-                                final Step<?, ?> step = (Step) this.barrierLock;
-                                while (step.hasNext()) {
-                                    this.sendTraverser(step.next());
-                                }
-                                // broadcast to all workers that the barrier is unlocked
-                                this.broadcast(new BarrierDoneMessage(this.barrierLock));
-                                this.barrierLock = null;
-                                this.firstHalt = true;
-                                this.haltSynchronization.clear();
-                            } else
-                                context().system().terminate();
+                    if (null != this.barrierLock) {
+                        final Step<?, ?> step = (Step) this.barrierLock;
+                        while (step.hasNext()) {
+                            this.sendTraverser(step.next());
                         }
-                    }
+                        this.barrierLock = null;
+                        this.workers.get(this.leaderWorker).tell(WorkerTraversalActor.Terminate.MAYBE,self());
+                    } else
+                        context().system().terminate();
                 }).build());
     }
 
@@ -145,11 +114,11 @@ public final class MasterTraversalActor extends AbstractActor implements Require
         }
     }
 
-    private void broadcast(final Object message) {
+    /*private void broadcast(final Object message) {
         for (final ActorSelection worker : this.workers.values()) {
             worker.tell(message, self());
         }
-    }
+    }*/
 
     private void processTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted() || traverser.get() instanceof Element) {
@@ -169,7 +138,6 @@ public final class MasterTraversalActor extends AbstractActor implements Require
         } else if (traverser.get() instanceof Element) {
             final Partition partition = this.partitioner.getPartition((Element) traverser.get());
             final ActorRef worker = this.workers.get("worker-" + partition.hashCode()).anchor();
-            this.haltSynchronization.remove(worker.path());
             worker.tell(traverser, self());
         } else {
             self().tell(traverser, self());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9a6be81b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 60ae1c6..866286d 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -37,7 +37,6 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAdd
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToContinueMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
 
 import java.util.HashMap;
@@ -46,15 +45,19 @@ import java.util.Map;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class WorkerTraversalActor extends AbstractActor implements
-        RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
+public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
+
+    public enum Terminate {MAYBE, YES, NO}
 
     private TraversalMatrix<?, ?> matrix = null;
     private final Partition localPartition;
     private final Partitioner partitioner;
-    private boolean voteToHalt = false;
+    private boolean voteToHalt = true;
     private final Map<String, ActorSelection> workers = new HashMap<>();
-
+    private String neighbor = null;
+    private boolean leader = false;
+    private Terminate terminationToken = null;
+    ///
     private Barrier barrierLock = null;
 
     public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
@@ -63,6 +66,14 @@ public final class WorkerTraversalActor extends AbstractActor implements
         this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
         this.localPartition = localPartition;
         this.partitioner = partitioner;
+        // create termination ring topology
+        for (int i = 0; i < this.partitioner.getPartitions().size(); i++) {
+            if (this.partitioner.getPartitions().get(i) == this.localPartition) {
+                this.neighbor = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
+                this.leader = i == 0;
+                break;
+            }
+        }
         ((GraphStep) traversal.getStartStep()).setIteratorSupplier(localPartition::vertices);
 
         receive(ReceiveBuilder.
@@ -73,48 +84,47 @@ public final class WorkerTraversalActor extends AbstractActor implements
                         this.sendTraverser(step.next());
                     }
                     // internal vote to have in mailbox as final message to process
-                    self().tell(VoteToHaltMessage.instance(), self());
+                    if (this.leader)
+                        self().tell(Terminate.MAYBE, self());
                 }).
                 match(Traverser.Admin.class, traverser -> {
-                    // internal vote to have in mailbox as final message to process
-                    if (this.voteToHalt) {
-                        this.voteToHalt = false;
-                        master().tell(VoteToContinueMessage.instance(), self());
-                    }
+                    this.voteToHalt = false;
                     this.processTraverser(traverser);
                 }).
                 match(SideEffectAddMessage.class, sideEffect -> {
                     // TODO
                 }).
-                match(VoteToContinueMessage.class, voteToContinueMessage -> {
-                    this.voteToHalt = false;
-                    self().tell(VoteToHaltMessage.instance(), self());
-                }).
                 match(BarrierDoneMessage.class, barrierSync -> {
                     // barrier is complete and processing can continue
                     if (null != this.barrierLock) {
                         this.barrierLock.done();
                         this.barrierLock = null;
                     }
-                    // internal vote to have in mailbox as final message to process
-                    if (this.voteToHalt) {
-                        this.voteToHalt = false;
-                        master().tell(VoteToContinueMessage.instance(), self());
-                    }
+                    if (this.leader)
+                        self().tell(Terminate.MAYBE, self());
+                }).
+                match(Terminate.class, terminate -> {
+                    assert this.leader || this.terminationToken != Terminate.MAYBE;
+                    this.terminationToken = terminate;
+                    self().tell(VoteToHaltMessage.instance(), self());
                 }).
                 match(VoteToHaltMessage.class, haltSync -> {
-                    if (sender().equals(master()))
-                        this.voteToHalt = false;
                     // if there is a barrier and thus, halting at barrier, then process barrier
-                    boolean hasBarrier = null != this.barrierLock && this.barrierLock.hasNextBarrier();
-                    if (hasBarrier) {
+                    if (null != this.barrierLock && this.barrierLock.hasNextBarrier()) {
                         while (this.barrierLock.hasNextBarrier()) {
                             master().tell(new BarrierAddMessage(this.barrierLock), self());
                         }
-                    }
-                    if (!this.voteToHalt) {
-                        // the final message in the worker mail box, tell master you are done processing messages
-                        master().tell(VoteToHaltMessage.instance(), self());
+                        self().tell(VoteToHaltMessage.instance(), self());
+                    } else if (null != this.terminationToken) {
+                        // use termination token to determine termination condition
+                        if (this.leader) {
+                            if (this.terminationToken == Terminate.YES && this.voteToHalt)
+                                master().tell(VoteToHaltMessage.instance(), self());
+                            else
+                                worker(this.neighbor).tell(Terminate.YES, self());
+                        } else
+                            worker(this.neighbor).tell(this.voteToHalt ? this.terminationToken : Terminate.NO, self());
+                        this.terminationToken = null;
                         this.voteToHalt = true;
                     }
                 }).build()
@@ -140,17 +150,20 @@ public final class WorkerTraversalActor extends AbstractActor implements
             master().tell(traverser, self());
         else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) {
             final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
-            final String workerPathString = "../worker-" + otherPartition.hashCode();
-            ActorSelection worker = this.workers.get(workerPathString);
-            if (null == worker) {
-                worker = context().actorSelection(workerPathString);
-                this.workers.put(workerPathString, worker);
-            }
-            worker.tell(traverser, self());
+            worker("../worker-" + otherPartition.hashCode()).tell(traverser, self());
         } else
             self().tell(traverser, self());
     }
 
+    private ActorSelection worker(final String path) {
+        ActorSelection worker = this.workers.get(path);
+        if (null == worker) {
+            worker = context().actorSelection(path);
+            this.workers.put(path, worker);
+        }
+        return worker;
+    }
+
     private ActorRef master() {
         return context().parent();
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9a6be81b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
deleted file mode 100644
index 1faa7a0..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class VoteToContinueMessage {
-
-    private static final VoteToContinueMessage INSTANCE = new VoteToContinueMessage();
-
-    private VoteToContinueMessage() {}
-
-    public static VoteToContinueMessage instance() {
-        return INSTANCE;
-    }
-}