You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/13 18:56:23 UTC
[07/50] [abbrv] tinkerpop git commit: we now have a full generalized
process/actor interface system where akka-gremlin/ implements those
interfaces. Next,
we have ActorProgram with TraversalActorProgram exectuing a traversal. This is
identical in form to
we now have a full generalized process/actor interface system where akka-gremlin/ implements those interfaces. Next, we have ActorProgram with TraversalActorProgram exectuing a traversal. This is identical in form to GraphComputer where people can submit arbitrary ActorPrograms. This is really clean and consistent with our other work.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/48ffd25a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/48ffd25a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/48ffd25a
Branch: refs/heads/TINKERPOP-1564
Commit: 48ffd25a560a8cf675e8c6858be808ff073fe0fb
Parents: a356d38
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Dec 13 09:52:41 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 13 11:55:55 2017 -0700
----------------------------------------------------------------------
.../gremlin/akka/process/actor/AkkaActors.java | 22 ++-
.../process/actor/MasterTraversalActor.java | 162 +++------------
.../akka/process/actor/TraverserMailbox.java | 5 +-
.../process/actor/WorkerTraversalActor.java | 180 ++++-------------
.../actor/WorkerTraversalSideEffects.java | 147 --------------
.../actor/message/BarrierAddMessage.java | 47 -----
.../actor/message/BarrierDoneMessage.java | 41 ----
.../actor/message/SideEffectAddMessage.java | 43 ----
.../actor/message/SideEffectSetMessage.java | 42 ----
.../process/actor/message/StartMessage.java | 35 ----
.../actor/message/VoteToHaltMessage.java | 36 ----
.../gremlin/akka/process/AkkaPlayTest.java | 2 +-
.../tinkerpop/gremlin/process/actor/Actor.java | 30 ++-
.../gremlin/process/actor/ActorProgram.java | 50 +++++
.../tinkerpop/gremlin/process/actor/Actors.java | 2 +
.../gremlin/process/actor/Address.java | 62 ++++++
.../gremlin/process/actor/MasterActor.java | 35 ----
.../gremlin/process/actor/WorkerActor.java | 35 ----
.../actor/traversal/TraversalActorProgram.java | 66 +++++++
.../actor/traversal/TraversalMasterProgram.java | 154 +++++++++++++++
.../actor/traversal/TraversalWorkerProgram.java | 196 +++++++++++++++++++
.../traversal/WorkerTraversalSideEffects.java | 147 ++++++++++++++
.../traversal/message/BarrierAddMessage.java | 47 +++++
.../traversal/message/BarrierDoneMessage.java | 41 ++++
.../traversal/message/SideEffectAddMessage.java | 43 ++++
.../traversal/message/SideEffectSetMessage.java | 42 ++++
.../actor/traversal/message/StartMessage.java | 35 ++++
.../traversal/message/VoteToHaltMessage.java | 36 ++++
.../actor/traversal/step/map/ActorStep.java | 4 +-
29 files changed, 1035 insertions(+), 752 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
index 8ef96bb..db024f6 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
@@ -21,8 +21,9 @@ package org.apache.tinkerpop.gremlin.akka.process.actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actor.Actors;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
@@ -34,12 +35,19 @@ import java.util.concurrent.Future;
*/
public final class AkkaActors<S, E> implements Actors<S, E> {
- public final ActorSystem system;
- private TraverserSet<E> results = new TraverserSet<>();
+ private final ActorProgram actorProgram;
+ private final ActorSystem system;
+ private final Address.Master master;
- public AkkaActors(final Traversal.Admin<S, E> traversal, final Partitioner partitioner) {
- this.system = ActorSystem.create("traversal-" + traversal.hashCode());
- this.system.actorOf(Props.create(MasterTraversalActor.class, traversal.clone(), partitioner, this.results), "master");
+ public AkkaActors(final ActorProgram actorProgram, final Partitioner partitioner) {
+ this.actorProgram = actorProgram;
+ this.system = ActorSystem.create("traversal-" + actorProgram.hashCode());
+ this.master = new Address.Master(this.system.actorOf(Props.create(MasterTraversalActor.class, this.actorProgram, partitioner), "master").path().toString());
+ }
+
+ @Override
+ public Address.Master master() {
+ return this.master;
}
@Override
@@ -48,7 +56,7 @@ public final class AkkaActors<S, E> implements Actors<S, E> {
while (!this.system.isTerminated()) {
}
- return this.results;
+ return (TraverserSet) this.actorProgram.getResult();
});
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
index 5009554..6799a28 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
@@ -20,35 +20,17 @@
package org.apache.tinkerpop.gremlin.akka.process.actor;
import akka.actor.AbstractActor;
-import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.RequiresMessageQueue;
import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy;
-import org.apache.tinkerpop.gremlin.process.actor.MasterActor;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
-import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -56,131 +38,47 @@ import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, MasterActor {
+public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Master {
- private final Traversal.Admin<?, ?> traversal;
- private final TraversalMatrix<?, ?> matrix;
- private final Partitioner partitioner;
- private final Map<String, ActorSelection> workers = new HashMap<>();
- private Map<String, Barrier> barriers = new HashMap<>();
- private final TraverserSet<?> results;
- private final String leaderWorker;
+ private final Address.Master master;
+ private final List<Address.Worker> workers;
+ private final Map<Address, ActorSelection> actors = new HashMap<>();
- public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
- System.out.println("master[created]: " + self().path());
- final TraversalStrategies strategies = traversal.getStrategies().clone();
- strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class);
- strategies.addStrategies(ActorVerificationStrategy.instance());
- traversal.setStrategies(strategies);
- traversal.applyStrategies();
-
- this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
- System.out.println(this.traversal);
- this.matrix = new TraversalMatrix<>(this.traversal);
- this.partitioner = partitioner;
- this.results = results;
- this.initializeWorkers();
- this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode();
-
- receive(ReceiveBuilder.
- match(Traverser.Admin.class, this::processTraverser).
- match(BarrierAddMessage.class, barrierMerge -> this.processBarrierAdd((Barrier) this.matrix.getStepById(barrierMerge.getStepId()), barrierMerge.getBarrier())).
- match(SideEffectAddMessage.class, sideEffect -> this.processSideEffectAdd(((SideEffectAddMessage) sideEffect).getKey(), ((SideEffectAddMessage) sideEffect).getValue())).
- match(VoteToHaltMessage.class, voteToHalt -> this.processVoteToHalt()).
- build());
- }
-
- private void initializeWorkers() {
- final List<Partition> partitions = this.partitioner.getPartitions();
+ public MasterTraversalActor(final ActorProgram program, final Partitioner partitioner) {
+ this.master = new Address.Master(self().path().toString());
+ this.workers = new ArrayList<>();
+ final List<Partition> partitions = partitioner.getPartitions();
for (final Partition partition : partitions) {
- final String workerPathString = "worker-" + partition.hashCode();
- final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), workerPathString);
- this.workers.put(workerPathString, context().actorSelection(worker.path()));
- }
- for (final ActorSelection worker : this.workers.values()) {
- worker.tell(StartMessage.instance(), self());
+ this.workers.add(new Address.Worker("worker-" + partition.hashCode()));
+ context().actorOf(Props.create(WorkerTraversalActor.class, program, partitioner, partition), "worker-" + partition.hashCode());
}
- this.workers.clear();
+ final ActorProgram.Master masterProgram = program.createMasterProgram(this);
+ receive(ReceiveBuilder.matchAny(masterProgram::execute).build());
+ masterProgram.setup();
}
@Override
- public void processBarrierAdd(final Barrier barrier, final Object barrierAddition) {
- final Step<?, ?> step = (Step) barrier;
- GraphComputing.atMaster(step, true);
- barrier.addBarrier(barrierAddition);
- this.barriers.put(step.getId(), barrier);
+ public <M> void send(final Address toActor, final M message) {
+ ActorSelection actor = this.actors.get(toActor);
+ if (null == actor) {
+ actor = context().actorSelection(toActor.location());
+ this.actors.put(toActor, actor);
+ }
+ actor.tell(message, self());
}
@Override
- public void processSideEffectAdd(final String key, final Object value) {
- this.traversal.getSideEffects().add(key, value);
+ public List<Address.Worker> workers() {
+ return this.workers;
}
@Override
- public void processVoteToHalt() {
- assert !sender().equals(self());
- if (!this.barriers.isEmpty()) {
- for (final Barrier barrier : this.barriers.values()) {
- final Step<?, ?> step = (Step) barrier;
- if (!(barrier instanceof LocalBarrier)) {
- while (step.hasNext()) {
- this.sendTraverser(step.next());
- }
- } else {
- this.traversal.getSideEffects().forEach((k, v) -> {
- this.broadcast(new SideEffectSetMessage(k, v));
- });
- this.broadcast(new BarrierDoneMessage(barrier));
- barrier.done();
- }
- }
- this.barriers.clear();
- worker(this.leaderWorker).tell(StartMessage.instance(), self());
- } else {
- while (this.traversal.hasNext()) {
- this.results.add((Traverser.Admin) this.traversal.nextTraverser());
- }
- context().system().terminate();
- }
+ public Address.Master address() {
+ return this.master;
}
@Override
- public void processTraverser(final Traverser.Admin traverser) {
- if (traverser.isHalted() || traverser.get() instanceof Element) {
- this.sendTraverser(traverser);
- } else {
- final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
- GraphComputing.atMaster(step, true);
- step.addStart(traverser);
- while (step.hasNext()) {
- this.processTraverser(step.next());
- }
- }
- }
-
- ////////////////
-
- private void broadcast(final Object message) {
- for (final Partition partition : this.partitioner.getPartitions()) {
- worker("worker-" + partition.hashCode()).tell(message, self());
- }
- }
-
- private void sendTraverser(final Traverser.Admin traverser) {
- if (traverser.isHalted())
- this.results.add(traverser);
- else if (traverser.get() instanceof Element)
- worker("worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()).tell(traverser, self());
- else
- self().tell(traverser, self());
- }
-
- private ActorSelection worker(final String workerPath) {
- ActorSelection worker = this.workers.get(workerPath);
- if (null == worker) {
- worker = context().actorSelection(workerPath);
- this.workers.put(workerPath, worker);
- }
- return worker;
+ public void close() {
+ context().system().terminate();
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
index 6a6c0f4..671f3a2 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java
@@ -26,7 +26,8 @@ import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import com.typesafe.config.Config;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalWorkerProgram;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import scala.Option;
@@ -58,7 +59,7 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
else if (handle.message() instanceof VoteToHaltMessage) {
assert null == this.haltMessage;
this.haltMessage = handle;
- } else if (handle.message() instanceof WorkerTraversalActor.Terminate) {
+ } else if (handle.message() instanceof TraversalWorkerProgram.Terminate) {
assert null == this.terminateToken;
this.terminateToken = handle;
} else
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
index 862b6b3..5a6bae7 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
@@ -20,184 +20,72 @@
package org.apache.tinkerpop.gremlin.akka.process.actor;
import akka.actor.AbstractActor;
-import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.dispatch.RequiresMessageQueue;
import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage;
-import org.apache.tinkerpop.gremlin.process.actor.WorkerActor;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, WorkerActor {
+public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Worker {
- // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm)
- public enum Terminate {
- MAYBE, YES, NO
- }
-
- private final TraversalMatrix<?, ?> matrix;
private final Partition localPartition;
- private final Partitioner partitioner;
- //
- private final Map<String, ActorSelection> workers = new HashMap<>();
- private final String neighborWorker;
- private boolean isLeader;
- private Terminate terminate = null;
- private boolean voteToHalt = false;
- private Map<String, Barrier> barriers = new HashMap<>();
+ private final Address.Worker self;
+ private final Address.Master master;
+ private final List<Address.Worker> workers;
+ private final Map<Address, ActorSelection> actors = new HashMap<>();
- public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
- System.out.println("worker[created]: " + self().path());
- // set up partition and traversal information
+ public WorkerTraversalActor(final ActorProgram program, final Partitioner partitioner, final Partition localPartition) {
this.localPartition = localPartition;
- this.partitioner = partitioner;
- final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), context());
- TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
- this.matrix = new TraversalMatrix<>(traversal);
- final GraphStep graphStep = (GraphStep) traversal.getStartStep();
- if (0 == graphStep.getIds().length)
- ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges);
- else {
- if (graphStep.returnsVertex())
- ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier(
- () -> IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), this.localPartition::contains));
- else
- ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
- () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains));
+ this.self = new Address.Worker(self().path().toString());
+ this.master = new Address.Master(context().parent().path().toString());
+ this.workers = new ArrayList<>();
+ for (final Partition partition : partitioner.getPartitions()) {
+ this.workers.add(new Address.Worker("../worker-" + partition.hashCode()));
}
- // create termination ring topology
- final int i = this.partitioner.getPartitions().indexOf(this.localPartition);
- this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
- this.isLeader = i == 0;
-
- receive(ReceiveBuilder.
- match(StartMessage.class, start -> this.processStart()).
- match(Traverser.Admin.class, this::processTraverser).
- match(SideEffectSetMessage.class, sideEffect -> this.processSideEffectSet(sideEffect.getKey(), sideEffect.getValue())).
- match(BarrierDoneMessage.class, barrierDone -> this.processBarrierDone(this.matrix.getStepById(barrierDone.getStepId()))).
- match(Terminate.class, terminate -> {
- assert this.isLeader || this.terminate != Terminate.MAYBE;
- this.terminate = terminate;
- self().tell(VoteToHaltMessage.instance(), self());
- }).
- match(VoteToHaltMessage.class, haltSync -> {
- // if there is a barrier and thus, halting at barrier, then process barrier
- if (!this.barriers.isEmpty()) {
- for (final Barrier barrier : this.barriers.values()) {
- while (barrier.hasNextBarrier()) {
- master().tell(new BarrierAddMessage(barrier), self());
- }
- }
- this.barriers.clear();
- this.voteToHalt = false;
- }
- // use termination token to determine termination condition
- if (null != this.terminate) {
- if (this.isLeader) {
- if (this.voteToHalt && Terminate.YES == this.terminate)
- master().tell(VoteToHaltMessage.instance(), self());
- else
- worker(this.neighborWorker).tell(Terminate.YES, self());
- } else
- worker(this.neighborWorker).tell(this.voteToHalt ? this.terminate : Terminate.NO, self());
- this.terminate = null;
- this.voteToHalt = true;
- }
- }).build()
- );
+ ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
+ receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
+ workerProgram.setup();
}
@Override
- public void processStart() {
- // initial message from master that says: "start processing"
- final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
- while (step.hasNext()) {
- this.sendTraverser(step.next());
- }
- // internal vote to have in mailbox as final message to process
- assert null == this.terminate;
- if (this.isLeader) {
- this.terminate = Terminate.MAYBE;
- self().tell(VoteToHaltMessage.instance(), self());
+ public <M> void send(final Address toActor, final M message) {
+ ActorSelection actor = this.actors.get(toActor);
+ if (null == actor) {
+ actor = context().actorSelection(toActor.location());
+ this.actors.put(toActor, actor);
}
+ actor.tell(message, self());
}
@Override
- public void processTraverser(final Traverser.Admin traverser) {
- assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
- final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
- if (step instanceof Bypassing) ((Bypassing) step).setBypass(true);
- GraphComputing.atMaster(step, false);
- step.addStart(traverser);
- if (step instanceof Barrier) {
- this.barriers.put(step.getId(), (Barrier) step);
- } else {
- while (step.hasNext()) {
- this.sendTraverser(step.next());
- }
- }
+ public List<Address.Worker> workers() {
+ return this.workers;
}
@Override
- public void processBarrierDone(final Barrier barrier) {
- final Step<?, ?> step = (Step) barrier;
- while (step.hasNext()) {
- sendTraverser(step.next());
- }
+ public Partition partition() {
+ return this.localPartition;
}
@Override
- public void processSideEffectSet(final String key, final Object value) {
- this.matrix.getTraversal().getSideEffects().set(key, value);
+ public Address.Worker address() {
+ return this.self;
}
- //////////////////////
-
- private void sendTraverser(final Traverser.Admin traverser) {
- this.voteToHalt = false;
- if (traverser.isHalted())
- master().tell(traverser, self());
- else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get()))
- worker("../worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()).tell(traverser, self());
- else
- self().tell(traverser, self());
- }
-
- private ActorSelection worker(final String workerPath) {
- ActorSelection worker = this.workers.get(workerPath);
- if (null == worker) {
- worker = context().actorSelection(workerPath);
- this.workers.put(workerPath, worker);
- }
- return worker;
- }
-
- private ActorRef master() {
- return context().parent();
+ @Override
+ public Address.Master master() {
+ return this.master;
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
deleted file mode 100644
index 9c03298..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.ActorContext;
-import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.BinaryOperator;
-import java.util.function.Supplier;
-import java.util.function.UnaryOperator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class WorkerTraversalSideEffects implements TraversalSideEffects {
-
- private TraversalSideEffects sideEffects;
- private ActorContext context;
-
-
- private WorkerTraversalSideEffects() {
- // for serialization
- }
-
- public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext context) {
- this.sideEffects = sideEffects;
- this.context = context;
- }
-
- public TraversalSideEffects getSideEffects() {
- return this.sideEffects;
- }
-
- @Override
- public void set(final String key, final Object value) {
- this.sideEffects.set(key, value);
- }
-
- @Override
- public <V> V get(final String key) throws IllegalArgumentException {
- return this.sideEffects.get(key);
- }
-
- @Override
- public void remove(final String key) {
- this.sideEffects.remove(key);
- }
-
- @Override
- public Set<String> keys() {
- return this.sideEffects.keys();
- }
-
- @Override
- public void add(final String key, final Object value) {
- this.context.parent().tell(new SideEffectAddMessage(key, value), this.context.self());
- }
-
- @Override
- public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
- this.sideEffects.register(key, initialValue, reducer);
- }
-
- @Override
- public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
- this.sideEffects.registerIfAbsent(key, initialValue, reducer);
- }
-
- @Override
- public <V> BinaryOperator<V> getReducer(final String key) {
- return this.sideEffects.getReducer(key);
- }
-
- @Override
- public <V> Supplier<V> getSupplier(final String key) {
- return this.sideEffects.getSupplier(key);
- }
-
- @Override
- @Deprecated
- public void registerSupplier(final String key, final Supplier supplier) {
- this.sideEffects.registerSupplier(key, supplier);
- }
-
- @Override
- @Deprecated
- public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
- return this.sideEffects.getRegisteredSupplier(key);
- }
-
- @Override
- public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
- this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
- }
-
- @Override
- public <S> Supplier<S> getSackInitialValue() {
- return this.sideEffects.getSackInitialValue();
- }
-
- @Override
- public <S> UnaryOperator<S> getSackSplitter() {
- return this.sideEffects.getSackSplitter();
- }
-
- @Override
- public <S> BinaryOperator<S> getSackMerger() {
- return this.sideEffects.getSackMerger();
- }
-
- @Override
- public TraversalSideEffects clone() {
- try {
- final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone();
- clone.sideEffects = this.sideEffects.clone();
- return clone;
- } catch (final CloneNotSupportedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- @Override
- public void mergeInto(final TraversalSideEffects sideEffects) {
- this.sideEffects.mergeInto(sideEffects);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
deleted file mode 100644
index 4a351c1..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierAddMessage {
-
- private final Object barrier;
- private final String stepId;
-
- public BarrierAddMessage(final Barrier barrier) {
- this.barrier = barrier.nextBarrier();
- this.stepId = ((Step) barrier).getId();
- }
-
- public Object getBarrier() {
- return this.barrier;
- }
-
- public String getStepId() {
- return this.stepId;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
deleted file mode 100644
index 208b346..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierDoneMessage {
-
- private final String stepId;
-
- public BarrierDoneMessage(final Barrier barrier) {
- this.stepId = ((Step) barrier).getId();
-
- }
-
- public String getStepId() {
- return this.stepId;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
deleted file mode 100644
index 2d97bfa..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectAddMessage {
-
- private final String key;
- private final Object value;
-
- public SideEffectAddMessage(final String key, final Object value) {
- this.value = value;
- this.key = key;
- }
-
- public String getKey() {
- return this.key;
- }
-
- public Object getValue() {
- return this.value;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java
deleted file mode 100644
index 023133b..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectSetMessage {
-
- private final String key;
- private final Object value;
-
- public SideEffectSetMessage(final String key, final Object value) {
- this.key = key;
- this.value = value;
- }
-
- public String getKey() {
- return this.key;
- }
-
- public Object getValue() {
- return this.value;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
deleted file mode 100644
index ebc469c..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class StartMessage {
-
- private static final StartMessage INSTANCE = new StartMessage();
-
- private StartMessage() {
- }
-
- public static StartMessage instance() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
deleted file mode 100644
index 8bfa4c9..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class VoteToHaltMessage {
-
- private static final VoteToHaltMessage INSTANCE = new VoteToHaltMessage();
-
- private VoteToHaltMessage() {
- }
-
- public static VoteToHaltMessage instance() {
- return INSTANCE;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
index bed7636..f94083c 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
@@ -42,7 +42,7 @@ public class AkkaPlayTest {
final Graph graph = TinkerGraph.open();
graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
GraphTraversalSource g = graph.traversal().withStrategies(new ActorStrategy(AkkaActors.class, new HashPartitioner(graph.partitioner(), 3)));
- System.out.println(g.V(1, 2).union(outE().count(), inE().count(), (Traversal) outE().values("weight").sum()).toList());
+ System.out.println(g.V().values("name").toList());
//3, 1.9, 1
/*for (int i = 0; i < 10000; i++) {
final Graph graph = TinkerGraph.open();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
index aa2f429..2552883 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
@@ -19,13 +19,39 @@
package org.apache.tinkerpop.gremlin.process.actor;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+
+import java.util.List;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public interface Actor {
- public <S> void processTraverser(final Traverser.Admin<S> traverser);
+ public Address address();
+
+ public <M> void send(final Address toActor, final M message);
+
+ public interface Master extends Actor {
+
+ public List<Address.Worker> workers();
+
+ public Address.Master address();
+
+ public void close();
+
+ }
+
+ public interface Worker extends Actor {
+
+ public Address.Worker address();
+
+ public Address.Master master();
+
+ public List<Address.Worker> workers();
+
+ public Partition partition();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
new file mode 100644
index 0000000..b8f7ac1
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface ActorProgram<M> {
+
+ public Worker createWorkerProgram(final Actor.Worker worker);
+
+ public Master createMasterProgram(final Actor.Master master);
+
+ public M getResult();
+
+ public static interface Worker<M> {
+ public void setup();
+
+ public void execute(final M message);
+
+ public void terminate();
+
+ }
+
+ public static interface Master<M> {
+ public void setup();
+
+ public void execute(final M message);
+
+ public void terminate();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
index d9e257e..2e410ec 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
@@ -28,5 +28,7 @@ import java.util.concurrent.Future;
*/
public interface Actors<S, E> {
+ public Address.Master master();
+
public Future<TraverserSet<E>> submit();
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
new file mode 100644
index 0000000..c598eb7
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class Address implements Serializable {
+
+ private final String location;
+
+ public Address(final String location) {
+ this.location = location;
+ }
+
+ public String location() {
+ return this.location;
+ }
+
+ public boolean equals(final Object other) {
+ return other instanceof Address && ((Address) other).location.equals(this.location);
+ }
+
+ public int hashCode() {
+ return this.location.hashCode();
+ }
+
+ public static class Master extends Address {
+
+ public Master(final String location) {
+ super(location);
+ }
+
+ }
+
+ public static class Worker extends Address {
+
+ public Worker(final String location) {
+ super(location);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java
deleted file mode 100644
index 87efe51..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor;
-
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface MasterActor extends Actor {
-
- public <V> void processBarrierAdd(final Barrier barrier, final V barrierAddition);
-
- public <V> void processSideEffectAdd(final String key, final V value);
-
- public void processVoteToHalt();
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java
deleted file mode 100644
index 6d4ca64..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor;
-
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface WorkerActor extends Actor {
-
- public void processStart();
-
- public void processBarrierDone(final Barrier barrier);
-
- public void processSideEffectSet(final String key, final Object value);
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
new file mode 100644
index 0000000..278fb3b
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraversalActorProgram<M> implements ActorProgram<M> {
+
+ private final Traversal.Admin<?, ?> traversal;
+ private final Partitioner partitioner;
+ public TraverserSet<?> result = new TraverserSet<>();
+
+ public TraversalActorProgram(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
+ this.partitioner = partitioner;
+ final TraversalStrategies strategies = traversal.getStrategies().clone();
+ strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class);
+ strategies.addStrategies(ActorVerificationStrategy.instance());
+ traversal.setStrategies(strategies);
+ traversal.applyStrategies();
+ this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
+ }
+
+ @Override
+ public Worker<M> createWorkerProgram(final Actor.Worker worker) {
+ return new TraversalWorkerProgram<>(worker, this.traversal.clone(), this.partitioner);
+ }
+
+ @Override
+ public Master createMasterProgram(final Actor.Master master) {
+ return new TraversalMasterProgram<>(master, this.traversal.clone(), this.partitioner, this.result);
+ }
+
+ @Override
+ public M getResult() {
+ return (M) this.result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
new file mode 100644
index 0000000..654969b
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
+
+ private final Actor.Master master;
+ private final Map<String, Address.Worker> workers = new HashMap<>();
+ private final Traversal.Admin<?, ?> traversal;
+ private final TraversalMatrix<?, ?> matrix;
+ private final Partitioner partitioner;
+ private Map<String, Barrier> barriers = new HashMap<>();
+ private final TraverserSet<?> results;
+ private final String leaderWorker;
+
+ public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
+ this.traversal = traversal;
+ System.out.println("master[created]: " + master.address().location());
+ System.out.println(this.traversal);
+ this.matrix = new TraversalMatrix<>(this.traversal);
+ this.partitioner = partitioner;
+ this.results = results;
+ this.master = master;
+ this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode();
+ }
+
+ @Override
+ public void setup() {
+ for (final Address.Worker worker : master.workers()) {
+ this.workers.put(worker.location(), worker);
+ }
+ this.broadcast(StartMessage.instance());
+
+ }
+
+ @Override
+ public void execute(final M message) {
+ if (message instanceof Traverser.Admin) {
+ this.processTraverser((Traverser.Admin) message);
+ } else if (message instanceof BarrierAddMessage) {
+ final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId());
+ final Step<?, ?> step = (Step) barrier;
+ GraphComputing.atMaster(step, true);
+ barrier.addBarrier(((BarrierAddMessage) message).getBarrier());
+ this.barriers.put(step.getId(), barrier);
+ } else if (message instanceof SideEffectAddMessage) {
+ this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue());
+ } else if (message instanceof VoteToHaltMessage) {
+ if (!this.barriers.isEmpty()) {
+ for (final Barrier barrier : this.barriers.values()) {
+ final Step<?, ?> step = (Step) barrier;
+ if (!(barrier instanceof LocalBarrier)) {
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
+ } else {
+ this.traversal.getSideEffects().forEach((k, v) -> {
+ this.broadcast(new SideEffectSetMessage(k, v));
+ });
+ this.broadcast(new BarrierDoneMessage(barrier));
+ barrier.done();
+ }
+ }
+ this.barriers.clear();
+ this.master.send(this.workers.get(this.leaderWorker), StartMessage.instance());
+ } else {
+ while (this.traversal.hasNext()) {
+ this.results.add((Traverser.Admin) this.traversal.nextTraverser());
+ }
+ this.master.close();
+ }
+ } else {
+ throw new IllegalStateException("Unknown message:" + message);
+ }
+ }
+
+ @Override
+ public void terminate() {
+
+ }
+
+ private void broadcast(final Object message) {
+ for (final Address.Worker worker : this.workers.values()) {
+ this.master.send(worker, message);
+ }
+ }
+
+ private void processTraverser(final Traverser.Admin traverser) {
+ if (traverser.isHalted() || traverser.get() instanceof Element) {
+ this.sendTraverser(traverser);
+ } else {
+ final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+ GraphComputing.atMaster(step, true);
+ step.addStart(traverser);
+ while (step.hasNext()) {
+ this.processTraverser(step.next());
+ }
+ }
+ }
+
+ private void sendTraverser(final Traverser.Admin traverser) {
+ if (traverser.isHalted())
+ this.results.add(traverser);
+ else if (traverser.get() instanceof Element)
+ this.master.send(this.workers.get("worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser);
+ else
+ this.master.send(this.master.address(), traverser);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
new file mode 100644
index 0000000..58e06d6
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
+
+ // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm)
+ public enum Terminate {
+ MAYBE, YES, NO
+ }
+
+ private final Actor.Worker self;
+ private final TraversalMatrix<?, ?> matrix;
+ private final Partition localPartition;
+ private final Partitioner partitioner;
+ //
+ private final Map<String, Address.Worker> workers = new HashMap<>();
+ private final String neighborWorker;
+ private boolean isLeader;
+ private Terminate terminate = null;
+ private boolean voteToHalt = false;
+ private Map<String, Barrier> barriers = new HashMap<>();
+
+ public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
+ this.self = self;
+ System.out.println("worker[created]: " + this.self.address().location());
+ // set up partition and traversal information
+ this.localPartition = self.partition();
+ this.partitioner = partitioner;
+ final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
+ TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
+ this.matrix = new TraversalMatrix<>(traversal);
+ final GraphStep graphStep = (GraphStep) traversal.getStartStep();
+ if (0 == graphStep.getIds().length)
+ ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges);
+ else {
+ if (graphStep.returnsVertex())
+ ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier(
+ () -> IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), this.localPartition::contains));
+ else
+ ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
+ () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains));
+ }
+ // create termination ring topology
+ final int i = this.partitioner.getPartitions().indexOf(this.localPartition);
+ this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
+ this.isLeader = i == 0;
+ for (final Address.Worker worker : self.workers()) {
+ //if (!worker.equals(this.self.address()))
+ this.workers.put(worker.location(), worker);
+ }
+ }
+
+ @Override
+ public void setup() {
+
+ }
+
+ @Override
+ public void execute(final M message) {
+ //System.out.println(message + "::" + this.isLeader);
+ if (message instanceof StartMessage) {
+ // initial message from master that says: "start processing"
+ final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
+ // internal vote to have in mailbox as final message to process
+ // assert null == this.terminate;
+ if (this.isLeader) {
+ this.terminate = Terminate.MAYBE;
+ this.self.send(this.self.address(), VoteToHaltMessage.instance());
+ }
+ } else if (message instanceof Traverser.Admin) {
+ final Traverser.Admin<?> traverser = (Traverser.Admin) message;
+ this.processTraverser(traverser);
+
+ } else if (message instanceof SideEffectSetMessage) {
+ this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue());
+ } else if (message instanceof Terminate) {
+ // assert this.isLeader || this.terminate != Terminate.MAYBE;
+ this.terminate = (Terminate) message;
+ this.self.send(this.self.address(), VoteToHaltMessage.instance());
+ } else if (message instanceof VoteToHaltMessage) {
+ // if there is a barrier and thus, halting at barrier, then process barrier
+ if (!this.barriers.isEmpty()) {
+ for (final Barrier barrier : this.barriers.values()) {
+ while (barrier.hasNextBarrier()) {
+ this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+ }
+ }
+ this.barriers.clear();
+ this.voteToHalt = false;
+ }
+ // use termination token to determine termination condition
+ if (null != this.terminate) {
+ if (this.isLeader) {
+ if (this.voteToHalt && Terminate.YES == this.terminate)
+ this.self.send(this.self.master(), VoteToHaltMessage.instance());
+ else
+ this.self.send(this.workers.get(this.neighborWorker), Terminate.YES);
+ } else
+ this.self.send(this.workers.get(this.neighborWorker), this.voteToHalt ? this.terminate : Terminate.NO);
+ this.terminate = null;
+ this.voteToHalt = true;
+ }
+ } else if (message instanceof BarrierDoneMessage) {
+ final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId());
+ while (step.hasNext()) {
+ sendTraverser(step.next());
+ }
+ } else {
+ throw new IllegalArgumentException("The following message is unknown: " + message);
+ }
+ }
+
+ @Override
+ public void terminate() {
+
+ }
+
+ //////////////
+
+ private void processTraverser(final Traverser.Admin traverser) {
+ // assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
+ final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+ if (step instanceof Bypassing) ((Bypassing) step).setBypass(true);
+ GraphComputing.atMaster(step, false);
+ step.addStart(traverser);
+ if (step instanceof Barrier) {
+ this.barriers.put(step.getId(), (Barrier) step);
+ } else {
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
+ }
+ }
+
+ private void sendTraverser(final Traverser.Admin traverser) {
+ this.voteToHalt = false;
+ if (traverser.isHalted())
+ this.self.send(this.self.master(), traverser);
+ else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get()))
+ this.self.send(this.workers.get("../worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser);
+ else
+ this.self.send(this.self.address(), traverser);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
new file mode 100644
index 0000000..6ab66c4
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerTraversalSideEffects implements TraversalSideEffects {
+
+ private TraversalSideEffects sideEffects;
+ private Actor.Worker worker;
+
+
+ private WorkerTraversalSideEffects() {
+ // for serialization
+ }
+
+ public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) {
+ this.sideEffects = sideEffects;
+ this.worker = worker;
+ }
+
+ public TraversalSideEffects getSideEffects() {
+ return this.sideEffects;
+ }
+
+ @Override
+ public void set(final String key, final Object value) {
+ this.sideEffects.set(key, value);
+ }
+
+ @Override
+ public <V> V get(final String key) throws IllegalArgumentException {
+ return this.sideEffects.get(key);
+ }
+
+ @Override
+ public void remove(final String key) {
+ this.sideEffects.remove(key);
+ }
+
+ @Override
+ public Set<String> keys() {
+ return this.sideEffects.keys();
+ }
+
+ @Override
+ public void add(final String key, final Object value) {
+ this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value));
+ }
+
+ @Override
+ public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+ this.sideEffects.register(key, initialValue, reducer);
+ }
+
+ @Override
+ public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+ this.sideEffects.registerIfAbsent(key, initialValue, reducer);
+ }
+
+ @Override
+ public <V> BinaryOperator<V> getReducer(final String key) {
+ return this.sideEffects.getReducer(key);
+ }
+
+ @Override
+ public <V> Supplier<V> getSupplier(final String key) {
+ return this.sideEffects.getSupplier(key);
+ }
+
+ @Override
+ @Deprecated
+ public void registerSupplier(final String key, final Supplier supplier) {
+ this.sideEffects.registerSupplier(key, supplier);
+ }
+
+ @Override
+ @Deprecated
+ public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+ return this.sideEffects.getRegisteredSupplier(key);
+ }
+
+ @Override
+ public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
+ this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
+ }
+
+ @Override
+ public <S> Supplier<S> getSackInitialValue() {
+ return this.sideEffects.getSackInitialValue();
+ }
+
+ @Override
+ public <S> UnaryOperator<S> getSackSplitter() {
+ return this.sideEffects.getSackSplitter();
+ }
+
+ @Override
+ public <S> BinaryOperator<S> getSackMerger() {
+ return this.sideEffects.getSackMerger();
+ }
+
+ @Override
+ public TraversalSideEffects clone() {
+ try {
+ final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone();
+ clone.sideEffects = this.sideEffects.clone();
+ return clone;
+ } catch (final CloneNotSupportedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void mergeInto(final TraversalSideEffects sideEffects) {
+ this.sideEffects.mergeInto(sideEffects);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/48ffd25a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java
new file mode 100644
index 0000000..dba9b86
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierAddMessage {
+
+ private final Object barrier;
+ private final String stepId;
+
+ public BarrierAddMessage(final Barrier barrier) {
+ this.barrier = barrier.nextBarrier();
+ this.stepId = ((Step) barrier).getId();
+ }
+
+ public Object getBarrier() {
+ return this.barrier;
+ }
+
+ public String getStepId() {
+ return this.stepId;
+ }
+
+
+}