You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/09 13:16:47 UTC
tinkerpop git commit: an old college text book taught me the
dual-ring termination algorithm for terminating a distributed message passing
system when inactive actors can become active again. A termination token
(enum) is passed between the workers. Prio
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 b8a226fae -> 9a6be81b5
an old college text book taught me the dual-ring termination algorithm for terminating a distributed message passing system when inactive actors can become active again. A termination token (enum) is passed between the workers. Prior, I was doing bookeeping at the master and it was not a sound algorithm. This algorithm is simpler and allowed me to remove lots of code. After 10,000 iterations of 3 tests, no deadlocks and no wrong answers. Finally -- that was shitty.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/9a6be81b
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/9a6be81b
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/9a6be81b
Branch: refs/heads/TINKERPOP-1564
Commit: 9a6be81b589110feda97cf29441b6d891388681a
Parents: b8a226f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Dec 9 06:16:42 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Dec 9 06:16:42 2016 -0700
----------------------------------------------------------------------
.../process/akka/MasterTraversalActor.java | 56 +++----------
.../process/akka/WorkerTraversalActor.java | 83 +++++++++++---------
.../akka/messages/VoteToContinueMessage.java | 34 --------
3 files changed, 60 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9a6be81b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 41dff8f..ff72ec0 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -20,7 +20,6 @@
package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.AbstractActor;
-import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
@@ -43,14 +42,11 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAdd
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToContinueMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -61,10 +57,9 @@ public final class MasterTraversalActor extends AbstractActor implements Require
private final TraversalMatrix<?, ?> matrix;
private final Partitioner partitioner;
private final Map<String, ActorSelection> workers = new HashMap<>();
- private final Set<ActorPath> haltSynchronization = new HashSet<>();
private Barrier barrierLock = null;
private final TraverserSet<?> results;
- private boolean firstHalt = true;
+ private final String leaderWorker;
public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
System.out.println("master[created]: " + self().path());
@@ -78,58 +73,32 @@ public final class MasterTraversalActor extends AbstractActor implements Require
this.partitioner = partitioner;
this.results = results;
this.initializeWorkers();
+ this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode();
receive(ReceiveBuilder.
match(Traverser.Admin.class, traverser -> {
- this.firstHalt = true;
- this.haltSynchronization.remove(sender().path());
this.processTraverser(traverser);
}).
match(BarrierAddMessage.class, barrierMerge -> {
- this.firstHalt = true;
- this.haltSynchronization.remove(sender().path());
final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId());
assert null == this.barrierLock || this.barrierLock == barrier;
this.barrierLock = barrier;
this.barrierLock.addBarrier(barrierMerge.getBarrier());
-
}).
match(SideEffectAddMessage.class, sideEffect -> {
- this.firstHalt = true;
- this.haltSynchronization.remove(sender().path());
this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
}).
- match(VoteToContinueMessage.class, voteToContinue -> {
- this.firstHalt = true;
- this.haltSynchronization.remove(sender().path());
- sender().tell(VoteToContinueMessage.instance(), self());
- }).
match(VoteToHaltMessage.class, voteToHalt -> {
assert !sender().equals(self());
- // receive vote to halt messages from worker
- // when all workers have voted to halt then terminate the system
- this.haltSynchronization.add(sender().path());
- if (this.haltSynchronization.size() == this.workers.size()) {
- if (this.firstHalt) {
- // a double vote mechanism is in play to ensure the all agents are fully complete
- this.firstHalt = false;
- this.haltSynchronization.clear();
- this.broadcast(VoteToHaltMessage.instance());
- } else {
- if (null != this.barrierLock) {
- final Step<?, ?> step = (Step) this.barrierLock;
- while (step.hasNext()) {
- this.sendTraverser(step.next());
- }
- // broadcast to all workers that the barrier is unlocked
- this.broadcast(new BarrierDoneMessage(this.barrierLock));
- this.barrierLock = null;
- this.firstHalt = true;
- this.haltSynchronization.clear();
- } else
- context().system().terminate();
+ if (null != this.barrierLock) {
+ final Step<?, ?> step = (Step) this.barrierLock;
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
}
- }
+ this.barrierLock = null;
+ this.workers.get(this.leaderWorker).tell(WorkerTraversalActor.Terminate.MAYBE,self());
+ } else
+ context().system().terminate();
}).build());
}
@@ -145,11 +114,11 @@ public final class MasterTraversalActor extends AbstractActor implements Require
}
}
- private void broadcast(final Object message) {
+ /*private void broadcast(final Object message) {
for (final ActorSelection worker : this.workers.values()) {
worker.tell(message, self());
}
- }
+ }*/
private void processTraverser(final Traverser.Admin traverser) {
if (traverser.isHalted() || traverser.get() instanceof Element) {
@@ -169,7 +138,6 @@ public final class MasterTraversalActor extends AbstractActor implements Require
} else if (traverser.get() instanceof Element) {
final Partition partition = this.partitioner.getPartition((Element) traverser.get());
final ActorRef worker = this.workers.get("worker-" + partition.hashCode()).anchor();
- this.haltSynchronization.remove(worker.path());
worker.tell(traverser, self());
} else {
self().tell(traverser, self());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9a6be81b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 60ae1c6..866286d 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -37,7 +37,6 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAdd
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToContinueMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
import java.util.HashMap;
@@ -46,15 +45,19 @@ import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class WorkerTraversalActor extends AbstractActor implements
- RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
+public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
+
+ public enum Terminate {MAYBE, YES, NO}
private TraversalMatrix<?, ?> matrix = null;
private final Partition localPartition;
private final Partitioner partitioner;
- private boolean voteToHalt = false;
+ private boolean voteToHalt = true;
private final Map<String, ActorSelection> workers = new HashMap<>();
-
+ private String neighbor = null;
+ private boolean leader = false;
+ private Terminate terminationToken = null;
+ ///
private Barrier barrierLock = null;
public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
@@ -63,6 +66,14 @@ public final class WorkerTraversalActor extends AbstractActor implements
this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
this.localPartition = localPartition;
this.partitioner = partitioner;
+ // create termination ring topology
+ for (int i = 0; i < this.partitioner.getPartitions().size(); i++) {
+ if (this.partitioner.getPartitions().get(i) == this.localPartition) {
+ this.neighbor = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
+ this.leader = i == 0;
+ break;
+ }
+ }
((GraphStep) traversal.getStartStep()).setIteratorSupplier(localPartition::vertices);
receive(ReceiveBuilder.
@@ -73,48 +84,47 @@ public final class WorkerTraversalActor extends AbstractActor implements
this.sendTraverser(step.next());
}
// internal vote to have in mailbox as final message to process
- self().tell(VoteToHaltMessage.instance(), self());
+ if (this.leader)
+ self().tell(Terminate.MAYBE, self());
}).
match(Traverser.Admin.class, traverser -> {
- // internal vote to have in mailbox as final message to process
- if (this.voteToHalt) {
- this.voteToHalt = false;
- master().tell(VoteToContinueMessage.instance(), self());
- }
+ this.voteToHalt = false;
this.processTraverser(traverser);
}).
match(SideEffectAddMessage.class, sideEffect -> {
// TODO
}).
- match(VoteToContinueMessage.class, voteToContinueMessage -> {
- this.voteToHalt = false;
- self().tell(VoteToHaltMessage.instance(), self());
- }).
match(BarrierDoneMessage.class, barrierSync -> {
// barrier is complete and processing can continue
if (null != this.barrierLock) {
this.barrierLock.done();
this.barrierLock = null;
}
- // internal vote to have in mailbox as final message to process
- if (this.voteToHalt) {
- this.voteToHalt = false;
- master().tell(VoteToContinueMessage.instance(), self());
- }
+ if (this.leader)
+ self().tell(Terminate.MAYBE, self());
+ }).
+ match(Terminate.class, terminate -> {
+ assert this.leader || this.terminationToken != Terminate.MAYBE;
+ this.terminationToken = terminate;
+ self().tell(VoteToHaltMessage.instance(), self());
}).
match(VoteToHaltMessage.class, haltSync -> {
- if (sender().equals(master()))
- this.voteToHalt = false;
// if there is a barrier and thus, halting at barrier, then process barrier
- boolean hasBarrier = null != this.barrierLock && this.barrierLock.hasNextBarrier();
- if (hasBarrier) {
+ if (null != this.barrierLock && this.barrierLock.hasNextBarrier()) {
while (this.barrierLock.hasNextBarrier()) {
master().tell(new BarrierAddMessage(this.barrierLock), self());
}
- }
- if (!this.voteToHalt) {
- // the final message in the worker mail box, tell master you are done processing messages
- master().tell(VoteToHaltMessage.instance(), self());
+ self().tell(VoteToHaltMessage.instance(), self());
+ } else if (null != this.terminationToken) {
+ // use termination token to determine termination condition
+ if (this.leader) {
+ if (this.terminationToken == Terminate.YES && this.voteToHalt)
+ master().tell(VoteToHaltMessage.instance(), self());
+ else
+ worker(this.neighbor).tell(Terminate.YES, self());
+ } else
+ worker(this.neighbor).tell(this.voteToHalt ? this.terminationToken : Terminate.NO, self());
+ this.terminationToken = null;
this.voteToHalt = true;
}
}).build()
@@ -140,17 +150,20 @@ public final class WorkerTraversalActor extends AbstractActor implements
master().tell(traverser, self());
else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) {
final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
- final String workerPathString = "../worker-" + otherPartition.hashCode();
- ActorSelection worker = this.workers.get(workerPathString);
- if (null == worker) {
- worker = context().actorSelection(workerPathString);
- this.workers.put(workerPathString, worker);
- }
- worker.tell(traverser, self());
+ worker("../worker-" + otherPartition.hashCode()).tell(traverser, self());
} else
self().tell(traverser, self());
}
+ private ActorSelection worker(final String path) {
+ ActorSelection worker = this.workers.get(path);
+ if (null == worker) {
+ worker = context().actorSelection(path);
+ this.workers.put(path, worker);
+ }
+ return worker;
+ }
+
private ActorRef master() {
return context().parent();
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9a6be81b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
deleted file mode 100644
index 1faa7a0..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class VoteToContinueMessage {
-
- private static final VoteToContinueMessage INSTANCE = new VoteToContinueMessage();
-
- private VoteToContinueMessage() {}
-
- public static VoteToContinueMessage instance() {
- return INSTANCE;
- }
-}