You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/09 14:29:10 UTC
tinkerpop git commit: removed an unneeded interface. cleaned up
TraverserMailbox. Added comments and various clean ups for discussion with
@spmallette.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 07d830060 -> 16c658008
removed an unneeded interface. cleaned up TraverserMailbox. Added comments and various clean ups for discussion with @spmallette.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/16c65800
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/16c65800
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/16c65800
Branch: refs/heads/TINKERPOP-1564
Commit: 16c658008bbda33b480bc1d1a6e5a7bf1912e758
Parents: 07d8300
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Dec 9 07:29:01 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Dec 9 07:29:01 2016 -0700
----------------------------------------------------------------------
.../process/akka/MasterTraversalActor.java | 15 ++---
.../process/akka/TraverserMailbox.java | 41 ++++++-----
.../process/akka/WorkerTraversalActor.java | 71 ++++++++++----------
.../akka/messages/SynchronizationMessage.java | 26 -------
.../akka/messages/VoteToHaltMessage.java | 5 +-
5 files changed, 65 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index e525220..c03cbe1 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -79,12 +79,15 @@ public final class MasterTraversalActor extends AbstractActor implements Require
this.processTraverser(traverser);
}).
match(BarrierAddMessage.class, barrierMerge -> {
+ // get the barrier updates from the workers to synchronize against the master barrier
final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId());
- assert null == this.barrierLock || this.barrierLock == barrier;
- this.barrierLock = barrier;
+ if (null == this.barrierLock)
+ this.barrierLock = barrier;
+ assert this.barrierLock == barrier;
this.barrierLock.addBarrier(barrierMerge.getBarrier());
}).
match(SideEffectAddMessage.class, sideEffect -> {
+ // get the side-effect updates from the workers to generate the master side-effects
this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
}).
match(VoteToHaltMessage.class, voteToHalt -> {
@@ -95,7 +98,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require
this.sendTraverser(step.next());
}
this.barrierLock = null;
- this.workers.get(this.leaderWorker).tell(WorkerTraversalActor.Terminate.MAYBE, self());
+ this.workers.get(this.leaderWorker).tell(StartMessage.instance(), self());
} else
context().system().terminate();
}).build());
@@ -113,12 +116,6 @@ public final class MasterTraversalActor extends AbstractActor implements Require
}
}
- /*private void broadcast(final Object message) {
- for (final ActorSelection worker : this.workers.values()) {
- worker.tell(message, self());
- }
- }*/
-
private void processTraverser(final Traverser.Admin traverser) {
if (traverser.isHalted() || traverser.get() instanceof Element) {
this.sendTraverser(traverser);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index 0e84919..940dfb1 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -29,7 +29,6 @@ import com.typesafe.config.Config;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
import scala.Option;
import java.util.Queue;
@@ -42,49 +41,49 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics {
- private final TraverserSet<?> traverserSet = new TraverserSet<>(new ConcurrentHashMap<>());
- private final Queue<Envelope> barrierSyncs = new ConcurrentLinkedQueue<>();
- private final Queue<Envelope> haltSyncs = new ConcurrentLinkedQueue<>();
- private final Queue<Envelope> queue = new ConcurrentLinkedQueue<>();
+ private final TraverserSet<?> traverserMessages = new TraverserSet<>(new ConcurrentHashMap<>());
+ private final Queue<Envelope> haltMessages = new ConcurrentLinkedQueue<>();
+ private final Queue<Envelope> otherMessages = new ConcurrentLinkedQueue<>();
private final ActorRef owner;
public TraverserMessageQueue(final ActorRef owner) {
this.owner = owner;
}
- // these must be implemented; queue used as example
public void enqueue(final ActorRef receiver, final Envelope handle) {
if (handle.message() instanceof Traverser.Admin)
- this.traverserSet.offer((Traverser.Admin) handle.message());
- else if (handle.message() instanceof SideEffectAddMessage)
- this.queue.offer(handle);
+ this.traverserMessages.offer((Traverser.Admin) handle.message());
else if (handle.message() instanceof VoteToHaltMessage)
- this.haltSyncs.offer(handle);
+ this.haltMessages.offer(handle);
else
- this.queue.offer(handle);
+ this.otherMessages.offer(handle);
}
public Envelope dequeue() {
- if (!this.queue.isEmpty())
- return this.queue.poll();
- else if (!this.traverserSet.isEmpty())
- return new Envelope(this.traverserSet.poll(), this.owner);
- else if (!this.barrierSyncs.isEmpty())
- return this.barrierSyncs.poll();
+ if (!this.otherMessages.isEmpty())
+ return this.otherMessages.poll();
+ else if (!this.traverserMessages.isEmpty())
+ return new Envelope(this.traverserMessages.poll(), this.owner);
else
- return this.haltSyncs.poll();
+ return this.haltMessages.poll();
}
public int numberOfMessages() {
- return this.queue.size() + this.traverserSet.size() + this.barrierSyncs.size() + this.haltSyncs.size();
+ return this.otherMessages.size() + this.traverserMessages.size() + this.haltMessages.size();
}
public boolean hasMessages() {
- return !this.queue.isEmpty() || !this.traverserSet.isEmpty() || !this.barrierSyncs.isEmpty() || !this.haltSyncs.isEmpty();
+ return !this.otherMessages.isEmpty() || !this.traverserMessages.isEmpty() || !this.haltMessages.isEmpty();
}
public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
- for (final Envelope handle : this.queue) {
+ for (final Envelope handle : this.otherMessages) {
+ deadLetters.enqueue(owner, handle);
+ }
+ for (final Traverser.Admin<?> traverser : this.traverserMessages) {
+ deadLetters.enqueue(owner, new Envelope(traverser, this.owner));
+ }
+ for (final Envelope handle : this.haltMessages) {
deadLetters.enqueue(owner, handle);
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 0107273..ad3f8d1 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -48,32 +48,29 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
public enum Terminate {MAYBE, YES, NO}
- private TraversalMatrix<?, ?> matrix = null;
+ private final TraversalMatrix<?, ?> matrix;
private final Partition localPartition;
private final Partitioner partitioner;
- private boolean voteToHalt = true;
+ //
private final Map<String, ActorSelection> workers = new HashMap<>();
- private String neighbor = null;
- private boolean leader = false;
- private Terminate terminationToken = null;
- ///
+ private final String neighborWorker;
+ private boolean isLeader;
+ private Terminate terminate = null;
+ private boolean voteToHalt = true;
private Barrier barrierLock = null;
public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
System.out.println("worker[created]: " + self().path());
- this.matrix = new TraversalMatrix<>(traversal);
- this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+ // set up partition and traversal information
this.localPartition = localPartition;
this.partitioner = partitioner;
+ this.matrix = new TraversalMatrix<>(traversal);
+ this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+ ((GraphStep) traversal.getStartStep()).setIteratorSupplier(this.localPartition::vertices);
// create termination ring topology
- for (int i = 0; i < this.partitioner.getPartitions().size(); i++) {
- if (this.partitioner.getPartitions().get(i) == this.localPartition) {
- this.neighbor = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
- this.leader = i == 0;
- break;
- }
- }
- ((GraphStep) traversal.getStartStep()).setIteratorSupplier(localPartition::vertices);
+ final int i = this.partitioner.getPartitions().indexOf(this.localPartition);
+ this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
+ this.isLeader = i == 0;
receive(ReceiveBuilder.
match(StartMessage.class, start -> {
@@ -83,19 +80,22 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
this.sendTraverser(step.next());
}
// internal vote to have in mailbox as final message to process
- if (this.leader)
- self().tell(Terminate.MAYBE, self());
+ assert null == this.terminate;
+ if (this.isLeader) {
+ this.terminate = Terminate.MAYBE;
+ self().tell(VoteToHaltMessage.instance(), self());
+ }
}).
match(Traverser.Admin.class, traverser -> {
this.voteToHalt = false;
this.processTraverser(traverser);
}).
match(SideEffectAddMessage.class, sideEffect -> {
- // TODO
+ // TODO: this is to update the local worker side-effects. necessary only for traversals that introspect on side-effects
}).
match(Terminate.class, terminate -> {
- assert this.leader || this.terminationToken != Terminate.MAYBE;
- this.terminationToken = terminate;
+ assert this.isLeader || this.terminate != Terminate.MAYBE;
+ this.terminate = terminate;
self().tell(VoteToHaltMessage.instance(), self());
}).
match(VoteToHaltMessage.class, haltSync -> {
@@ -104,17 +104,17 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
while (this.barrierLock.hasNextBarrier()) {
master().tell(new BarrierAddMessage(this.barrierLock), self());
}
- self().tell(VoteToHaltMessage.instance(), self());
- } else if (null != this.terminationToken) {
- // use termination token to determine termination condition
- if (this.leader) {
- if (this.terminationToken == Terminate.YES && this.voteToHalt)
+ }
+ // use termination token to determine termination condition
+ if (null != this.terminate) {
+ if (this.isLeader) {
+ if (this.voteToHalt && Terminate.YES == this.terminate)
master().tell(VoteToHaltMessage.instance(), self());
else
- worker(this.neighbor).tell(Terminate.YES, self());
+ worker(this.neighborWorker).tell(Terminate.YES, self());
} else
- worker(this.neighbor).tell(this.voteToHalt ? this.terminationToken : Terminate.NO, self());
- this.terminationToken = null;
+ worker(this.neighborWorker).tell(this.voteToHalt ? this.terminate : Terminate.NO, self());
+ this.terminate = null;
this.voteToHalt = true;
}
}).build()
@@ -126,8 +126,9 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
step.addStart(traverser);
if (step instanceof Barrier) {
- assert null == this.barrierLock || step == this.barrierLock;
- this.barrierLock = (Barrier) step;
+ if (null == this.barrierLock)
+ this.barrierLock = (Barrier) step;
+ assert step == this.barrierLock;
} else {
while (step.hasNext()) {
this.sendTraverser(step.next());
@@ -145,11 +146,11 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
self().tell(traverser, self());
}
- private ActorSelection worker(final String path) {
- ActorSelection worker = this.workers.get(path);
+ private ActorSelection worker(final String workerPath) {
+ ActorSelection worker = this.workers.get(workerPath);
if (null == worker) {
- worker = context().actorSelection(path);
- this.workers.put(path, worker);
+ worker = context().actorSelection(workerPath);
+ this.workers.put(workerPath, worker);
}
return worker;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SynchronizationMessage.java
deleted file mode 100644
index 54e7ade..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SynchronizationMessage.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface SynchronizationMessage {
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
index f7d8970..767699c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
@@ -22,11 +22,12 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class VoteToHaltMessage implements SynchronizationMessage {
+public final class VoteToHaltMessage {
private static final VoteToHaltMessage INSTANCE = new VoteToHaltMessage();
- private VoteToHaltMessage() {}
+ private VoteToHaltMessage() {
+ }
public static VoteToHaltMessage instance() {
return INSTANCE;