You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/07 20:18:57 UTC
tinkerpop git commit: got side-effects working. stupid bug on my part
(had sender and reciever mixed up). so there we have it,
distributed OLTP is complete. traversers, barriers, halting, side-effects,
bulking, ... its all there.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 cb1073884 -> c80abcf37
got side-effects working. stupid bug on my part (had sender and reciever mixed up). so there we have it, distributed OLTP is complete. traversers, barriers, halting, side-effects, bulking, ... its all there.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/c80abcf3
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/c80abcf3
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/c80abcf3
Branch: refs/heads/TINKERPOP-1564
Commit: c80abcf37e0be8d34fc6b295f47a99f9927efc1f
Parents: cb10738
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 7 13:18:53 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 7 13:18:53 2016 -0700
----------------------------------------------------------------------
.../akka/DistributedTraversalSideEffects.java | 12 ++++---
.../process/akka/MasterTraversalActor.java | 36 ++++++++++++--------
.../process/akka/TinkerActorSystem.java | 2 +-
.../process/akka/TraverserMailbox.java | 14 ++++++--
.../process/akka/WorkerTraversalActor.java | 27 ++++++++++-----
.../messages/BarrierSynchronizationMessage.java | 8 ++++-
.../akka/messages/SideEffectMessage.java | 1 +
7 files changed, 69 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
index 5be17c6..ceacdd3 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
@@ -20,6 +20,9 @@
package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.ActorContext;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
@@ -37,15 +40,16 @@ import java.util.function.UnaryOperator;
public final class DistributedTraversalSideEffects implements TraversalSideEffects {
private TraversalSideEffects sideEffects;
- private ActorContext worker;
+ private ActorContext context;
+
private DistributedTraversalSideEffects() {
// for serialization
}
- public DistributedTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext worker) {
+ public DistributedTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext context) {
this.sideEffects = sideEffects;
- this.worker = worker;
+ this.context = context;
}
public TraversalSideEffects getSideEffects() {
@@ -74,7 +78,7 @@ public final class DistributedTraversalSideEffects implements TraversalSideEffec
@Override
public void add(final String key, final Object value) {
- this.worker.self().tell(new SideEffectMessage(key, value), this.worker.parent());
+ this.context.parent().tell(new SideEffectMessage(key, value), this.context.self());
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 769c77c..824c7ca 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -41,8 +41,10 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffect
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -53,7 +55,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require
private final TraversalMatrix<?, ?> matrix;
private final Partitioner partitioner;
private List<ActorPath> workers;
- private final Map<String, Integer> synchronizationCounters = new HashMap<>();
+ private final Map<String, Set<ActorPath>> synchronizationLocks = new HashMap<>();
public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
System.out.println("master[created]: " + self().path());
@@ -70,33 +72,39 @@ public final class MasterTraversalActor extends AbstractActor implements Require
match(BarrierMessage.class, barrier -> {
final Barrier barrierStep = ((Barrier) this.matrix.getStepById(barrier.getStepId()));
barrierStep.addBarrier(barrier.getBarrier());
- broadcast(new BarrierSynchronizationMessage(barrierStep));
+ broadcast(new BarrierSynchronizationMessage(barrierStep, true));
}).
match(BarrierSynchronizationMessage.class, barrierSync -> {
- final Integer counter = this.synchronizationCounters.get(barrierSync.getStepId());
- final int newCounter = null == counter ? 1 : counter + 1;
- this.synchronizationCounters.put(barrierSync.getStepId(), newCounter);
- if (newCounter == this.workers.size()) {
+ Set<ActorPath> locks = this.synchronizationLocks.get(barrierSync.getStepId());
+ if (null == locks) {
+ locks = new HashSet<>();
+ this.synchronizationLocks.put(barrierSync.getStepId(), locks);
+ }
+ locks.add(sender().path());
+ if (locks.size() == this.workers.size()) {
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(barrierSync.getStepId());
+ this.broadcast(new BarrierSynchronizationMessage((Barrier) step, false));
while (step.hasNext()) {
this.processTraverser(step.next());
}
}
}).
match(SideEffectMessage.class, sideEffect -> {
- sideEffect.addSideEffect(this.traversal);
- this.broadcast(new SideEffectMessage(sideEffect.getKey(), sideEffect.getValue()));
+ this.traversal.getSideEffects().add(sideEffect.getKey(),sideEffect.getValue());
+ //this.broadcast(new SideEffectMessage(sideEffect.getKey(), sideEffect.getValue()));
}).
match(HaltSynchronizationMessage.class, haltSync -> {
+ Set<ActorPath> locks = this.synchronizationLocks.get(Traverser.Admin.HALT);
+ if (null == locks) {
+ locks = new HashSet<>();
+ this.synchronizationLocks.put(Traverser.Admin.HALT, locks);
+ }
if (haltSync.isHalt()) {
- final Integer counter = this.synchronizationCounters.get(Traverser.Admin.HALT);
- final int newCounter = null == counter ? 1 : counter + 1;
- this.synchronizationCounters.put(Traverser.Admin.HALT, newCounter);
- if (newCounter == this.workers.size()) {
+ locks.add(sender().path());
+ if (locks.size() == this.workers.size())
context().system().terminate();
- }
} else {
- this.synchronizationCounters.remove(Traverser.Admin.HALT);
+ locks.remove(sender().path());
this.broadcast(new HaltSynchronizationMessage(true));
}
}).build());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index 6fc370b..bf3318c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -53,7 +53,7 @@ public final class TinkerActorSystem {
as("a").out().as("b"),
as("b").in().as("c"),
as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin();*/
- final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount().by("name").asAdmin();
+ final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount("a").by("name").cap("a").select(Column.keys).unfold().limit(10).asAdmin();
new TinkerActorSystem(traversal);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index 9d8931f..69716a6 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -30,6 +30,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
import scala.Option;
import java.util.Queue;
@@ -39,18 +40,25 @@ import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
+public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics {
private final TraverserSet<?> traverserSet = new TraverserSet<>(new ConcurrentHashMap<>());
private final Queue<Envelope> barrierSyncs = new ConcurrentLinkedQueue<>();
private final Queue<Envelope> haltSyncs = new ConcurrentLinkedQueue<>();
private final Queue<Envelope> queue = new ConcurrentLinkedQueue<>();
+ private final ActorRef owner;
+
+ public TraverserMessageQueue(final ActorRef owner) {
+ this.owner = owner;
+ }
// these must be implemented; queue used as example
public void enqueue(final ActorRef receiver, final Envelope handle) {
if (handle.message() instanceof Traverser.Admin)
this.traverserSet.offer((Traverser.Admin) handle.message());
+ else if (handle.message() instanceof SideEffectMessage)
+ this.queue.offer(handle);
else if (handle.message() instanceof BarrierSynchronizationMessage)
this.barrierSyncs.offer(handle);
else if (handle.message() instanceof HaltSynchronizationMessage)
@@ -63,7 +71,7 @@ public class TraverserMailbox implements MailboxType, ProducesMessageQueue<Trave
if (!this.queue.isEmpty())
return this.queue.poll();
else if (!this.traverserSet.isEmpty())
- return new Envelope(this.traverserSet.poll(), ActorRef.noSender());
+ return new Envelope(this.traverserSet.poll(), this.owner);
else if (!this.barrierSyncs.isEmpty())
return this.barrierSyncs.poll();
else
@@ -92,7 +100,7 @@ public class TraverserMailbox implements MailboxType, ProducesMessageQueue<Trave
// The create method is called to create the MessageQueue
public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem> system) {
- return new TraverserMessageQueue();
+ return new TraverserMessageQueue(owner.isEmpty() ? ActorRef.noSender() : owner.get());
}
public static interface TraverserSetSemantics {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index fa0a14d..96cf6dd 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -52,7 +52,7 @@ public final class WorkerTraversalActor extends AbstractActor implements
public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition partition, final Partitioner partitioner) {
System.out.println("worker[created]: " + self().path());
this.matrix = new TraversalMatrix<>(traversal);
- this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+
this.partition = partition;
this.partitioner = partitioner;
((GraphStep) traversal.getStartStep()).setIteratorSupplier(partition::vertices);
@@ -60,27 +60,31 @@ public final class WorkerTraversalActor extends AbstractActor implements
receive(ReceiveBuilder.
match(TinkerActorSystem.State.class, state -> {
+ this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
while (step.hasNext()) {
this.processTraverser(step.next());
}
}).
match(BarrierSynchronizationMessage.class, barrierSync -> {
- final Barrier step = this.matrix.getStepById(barrierSync.getStepId());
- while (step.hasNextBarrier()) {
- sender().tell(new BarrierMessage(step), self());
+ final Barrier barrier = this.matrix.getStepById(barrierSync.getStepId());
+ if(barrierSync.getLock()) {
+ this.processBarrier(barrier);
+ } else {
+ barrier.done();
}
- sender().tell(new BarrierSynchronizationMessage(step), self());
}).
match(SideEffectMessage.class, sideEffect -> {
- sideEffect.setSideEffect(this.matrix.getTraversal());
+ // TODO: sideEffect.setSideEffect(this.matrix.getTraversal());
}).
match(HaltSynchronizationMessage.class, haltSync -> {
sender().tell(new HaltSynchronizationMessage(true), self());
+ this.sentHaltMessage = true;
}).
match(Traverser.Admin.class, traverser -> {
if (this.sentHaltMessage) {
context().parent().tell(new HaltSynchronizationMessage(false), self());
+ this.sentHaltMessage = false;
}
this.processTraverser(traverser);
}).build()
@@ -89,7 +93,7 @@ public final class WorkerTraversalActor extends AbstractActor implements
private void processTraverser(final Traverser.Admin traverser) {
if (traverser.isHalted())
- context().parent().tell(traverser, ActorRef.noSender());
+ context().parent().tell(traverser, self());
else if (traverser.get() instanceof Element && !this.partition.contains((Element) traverser.get())) {
final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
context().actorSelection("../worker-" + otherPartition.hashCode()).tell(traverser, self());
@@ -97,7 +101,7 @@ public final class WorkerTraversalActor extends AbstractActor implements
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
step.addStart(traverser);
if (step instanceof Barrier) {
- context().parent().tell(new BarrierMessage((Barrier) step), self());
+ this.processBarrier((Barrier)step);
} else {
while (step.hasNext()) {
this.processTraverser(step.next());
@@ -105,4 +109,11 @@ public final class WorkerTraversalActor extends AbstractActor implements
}
}
}
+
+ private void processBarrier(final Barrier barrier) {
+ while(barrier.hasNextBarrier()) {
+ context().parent().tell(new BarrierMessage(barrier), self());
+ }
+ context().parent().tell(new BarrierSynchronizationMessage(barrier, true), self());
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
index 3e5ff14..467ac28 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
@@ -28,12 +28,18 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
public final class BarrierSynchronizationMessage implements SynchronizationMessage {
private final String stepId;
+ private final boolean lock;
- public BarrierSynchronizationMessage(final Barrier barrier) {
+ public BarrierSynchronizationMessage(final Barrier barrier, final boolean lock) {
this.stepId = ((Step) barrier).getId();
+ this.lock = lock;
}
public String getStepId() {
return this.stepId;
}
+
+ public boolean getLock() {
+ return this.lock;
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
index e9721cc..5b8685c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
@@ -39,6 +39,7 @@ public final class SideEffectMessage {
}
public void addSideEffect(final Traversal.Admin<?, ?> traversal) {
+ System.out.println(traversal.getSideEffects().getClass());
traversal.getSideEffects().add(this.sideEffectKey, this.sideEffect);
}