You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/07 20:18:57 UTC

tinkerpop git commit: got side-effects working. stupid bug on my part (had sender and reciever mixed up). so there we have it, distributed OLTP is complete. traversers, barriers, halting, side-effects, bulking, ... its all there.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 cb1073884 -> c80abcf37


got side-effects working. stupid bug on my part (had sender and reciever mixed up). so there we have it, distributed OLTP is complete. traversers, barriers, halting, side-effects, bulking, ... its all there.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/c80abcf3
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/c80abcf3
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/c80abcf3

Branch: refs/heads/TINKERPOP-1564
Commit: c80abcf37e0be8d34fc6b295f47a99f9927efc1f
Parents: cb10738
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 7 13:18:53 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 7 13:18:53 2016 -0700

----------------------------------------------------------------------
 .../akka/DistributedTraversalSideEffects.java   | 12 ++++---
 .../process/akka/MasterTraversalActor.java      | 36 ++++++++++++--------
 .../process/akka/TinkerActorSystem.java         |  2 +-
 .../process/akka/TraverserMailbox.java          | 14 ++++++--
 .../process/akka/WorkerTraversalActor.java      | 27 ++++++++++-----
 .../messages/BarrierSynchronizationMessage.java |  8 ++++-
 .../akka/messages/SideEffectMessage.java        |  1 +
 7 files changed, 69 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
index 5be17c6..ceacdd3 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
@@ -20,6 +20,9 @@
 package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 
 import akka.actor.ActorContext;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
@@ -37,15 +40,16 @@ import java.util.function.UnaryOperator;
 public final class DistributedTraversalSideEffects implements TraversalSideEffects {
 
     private TraversalSideEffects sideEffects;
-    private ActorContext worker;
+    private ActorContext context;
+
 
     private DistributedTraversalSideEffects() {
         // for serialization
     }
 
-    public DistributedTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext worker) {
+    public DistributedTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext context) {
         this.sideEffects = sideEffects;
-        this.worker = worker;
+        this.context = context;
     }
 
     public TraversalSideEffects getSideEffects() {
@@ -74,7 +78,7 @@ public final class DistributedTraversalSideEffects implements TraversalSideEffec
 
     @Override
     public void add(final String key, final Object value) {
-        this.worker.self().tell(new SideEffectMessage(key, value), this.worker.parent());
+        this.context.parent().tell(new SideEffectMessage(key, value), this.context.self());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 769c77c..824c7ca 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -41,8 +41,10 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffect
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -53,7 +55,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require
     private final TraversalMatrix<?, ?> matrix;
     private final Partitioner partitioner;
     private List<ActorPath> workers;
-    private final Map<String, Integer> synchronizationCounters = new HashMap<>();
+    private final Map<String, Set<ActorPath>> synchronizationLocks = new HashMap<>();
 
     public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
         System.out.println("master[created]: " + self().path());
@@ -70,33 +72,39 @@ public final class MasterTraversalActor extends AbstractActor implements Require
                 match(BarrierMessage.class, barrier -> {
                     final Barrier barrierStep = ((Barrier) this.matrix.getStepById(barrier.getStepId()));
                     barrierStep.addBarrier(barrier.getBarrier());
-                    broadcast(new BarrierSynchronizationMessage(barrierStep));
+                    broadcast(new BarrierSynchronizationMessage(barrierStep, true));
                 }).
                 match(BarrierSynchronizationMessage.class, barrierSync -> {
-                    final Integer counter = this.synchronizationCounters.get(barrierSync.getStepId());
-                    final int newCounter = null == counter ? 1 : counter + 1;
-                    this.synchronizationCounters.put(barrierSync.getStepId(), newCounter);
-                    if (newCounter == this.workers.size()) {
+                    Set<ActorPath> locks = this.synchronizationLocks.get(barrierSync.getStepId());
+                    if (null == locks) {
+                        locks = new HashSet<>();
+                        this.synchronizationLocks.put(barrierSync.getStepId(), locks);
+                    }
+                    locks.add(sender().path());
+                    if (locks.size() == this.workers.size()) {
                         final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(barrierSync.getStepId());
+                        this.broadcast(new BarrierSynchronizationMessage((Barrier) step, false));
                         while (step.hasNext()) {
                             this.processTraverser(step.next());
                         }
                     }
                 }).
                 match(SideEffectMessage.class, sideEffect -> {
-                    sideEffect.addSideEffect(this.traversal);
-                    this.broadcast(new SideEffectMessage(sideEffect.getKey(), sideEffect.getValue()));
+                    this.traversal.getSideEffects().add(sideEffect.getKey(),sideEffect.getValue());
+                    //this.broadcast(new SideEffectMessage(sideEffect.getKey(), sideEffect.getValue()));
                 }).
                 match(HaltSynchronizationMessage.class, haltSync -> {
+                    Set<ActorPath> locks = this.synchronizationLocks.get(Traverser.Admin.HALT);
+                    if (null == locks) {
+                        locks = new HashSet<>();
+                        this.synchronizationLocks.put(Traverser.Admin.HALT, locks);
+                    }
                     if (haltSync.isHalt()) {
-                        final Integer counter = this.synchronizationCounters.get(Traverser.Admin.HALT);
-                        final int newCounter = null == counter ? 1 : counter + 1;
-                        this.synchronizationCounters.put(Traverser.Admin.HALT, newCounter);
-                        if (newCounter == this.workers.size()) {
+                        locks.add(sender().path());
+                        if (locks.size() == this.workers.size())
                             context().system().terminate();
-                        }
                     } else {
-                        this.synchronizationCounters.remove(Traverser.Admin.HALT);
+                        locks.remove(sender().path());
                         this.broadcast(new HaltSynchronizationMessage(true));
                     }
                 }).build());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index 6fc370b..bf3318c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -53,7 +53,7 @@ public final class TinkerActorSystem {
                 as("a").out().as("b"),
                 as("b").in().as("c"),
                 as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin();*/
-        final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount().by("name").asAdmin();
+        final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount("a").by("name").cap("a").select(Column.keys).unfold().limit(10).asAdmin();
         new TinkerActorSystem(traversal);
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index 9d8931f..69716a6 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -30,6 +30,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
 import scala.Option;
 
 import java.util.Queue;
@@ -39,18 +40,25 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
+public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
 
     public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics {
         private final TraverserSet<?> traverserSet = new TraverserSet<>(new ConcurrentHashMap<>());
         private final Queue<Envelope> barrierSyncs = new ConcurrentLinkedQueue<>();
         private final Queue<Envelope> haltSyncs = new ConcurrentLinkedQueue<>();
         private final Queue<Envelope> queue = new ConcurrentLinkedQueue<>();
+        private final ActorRef owner;
+
+        public TraverserMessageQueue(final ActorRef owner) {
+            this.owner = owner;
+        }
 
         // these must be implemented; queue used as example
         public void enqueue(final ActorRef receiver, final Envelope handle) {
             if (handle.message() instanceof Traverser.Admin)
                 this.traverserSet.offer((Traverser.Admin) handle.message());
+            else if (handle.message() instanceof SideEffectMessage)
+                this.queue.offer(handle);
             else if (handle.message() instanceof BarrierSynchronizationMessage)
                 this.barrierSyncs.offer(handle);
             else if (handle.message() instanceof HaltSynchronizationMessage)
@@ -63,7 +71,7 @@ public class TraverserMailbox implements MailboxType, ProducesMessageQueue<Trave
             if (!this.queue.isEmpty())
                 return this.queue.poll();
             else if (!this.traverserSet.isEmpty())
-                return new Envelope(this.traverserSet.poll(), ActorRef.noSender());
+                return new Envelope(this.traverserSet.poll(), this.owner);
             else if (!this.barrierSyncs.isEmpty())
                 return this.barrierSyncs.poll();
             else
@@ -92,7 +100,7 @@ public class TraverserMailbox implements MailboxType, ProducesMessageQueue<Trave
 
     // The create method is called to create the MessageQueue
     public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem> system) {
-        return new TraverserMessageQueue();
+        return new TraverserMessageQueue(owner.isEmpty() ? ActorRef.noSender() : owner.get());
     }
 
     public static interface TraverserSetSemantics {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index fa0a14d..96cf6dd 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -52,7 +52,7 @@ public final class WorkerTraversalActor extends AbstractActor implements
     public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition partition, final Partitioner partitioner) {
         System.out.println("worker[created]: " + self().path());
         this.matrix = new TraversalMatrix<>(traversal);
-        this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+
         this.partition = partition;
         this.partitioner = partitioner;
         ((GraphStep) traversal.getStartStep()).setIteratorSupplier(partition::vertices);
@@ -60,27 +60,31 @@ public final class WorkerTraversalActor extends AbstractActor implements
 
         receive(ReceiveBuilder.
                 match(TinkerActorSystem.State.class, state -> {
+                    this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
                     final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
                     while (step.hasNext()) {
                         this.processTraverser(step.next());
                     }
                 }).
                 match(BarrierSynchronizationMessage.class, barrierSync -> {
-                    final Barrier step = this.matrix.getStepById(barrierSync.getStepId());
-                    while (step.hasNextBarrier()) {
-                        sender().tell(new BarrierMessage(step), self());
+                    final Barrier barrier = this.matrix.getStepById(barrierSync.getStepId());
+                    if(barrierSync.getLock()) {
+                        this.processBarrier(barrier);
+                    } else {
+                        barrier.done();
                     }
-                    sender().tell(new BarrierSynchronizationMessage(step), self());
                 }).
                 match(SideEffectMessage.class, sideEffect -> {
-                    sideEffect.setSideEffect(this.matrix.getTraversal());
+                    // TODO: sideEffect.setSideEffect(this.matrix.getTraversal());
                 }).
                 match(HaltSynchronizationMessage.class, haltSync -> {
                     sender().tell(new HaltSynchronizationMessage(true), self());
+                    this.sentHaltMessage = true;
                 }).
                 match(Traverser.Admin.class, traverser -> {
                     if (this.sentHaltMessage) {
                         context().parent().tell(new HaltSynchronizationMessage(false), self());
+                        this.sentHaltMessage = false;
                     }
                     this.processTraverser(traverser);
                 }).build()
@@ -89,7 +93,7 @@ public final class WorkerTraversalActor extends AbstractActor implements
 
     private void processTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted())
-            context().parent().tell(traverser, ActorRef.noSender());
+            context().parent().tell(traverser, self());
         else if (traverser.get() instanceof Element && !this.partition.contains((Element) traverser.get())) {
             final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
             context().actorSelection("../worker-" + otherPartition.hashCode()).tell(traverser, self());
@@ -97,7 +101,7 @@ public final class WorkerTraversalActor extends AbstractActor implements
             final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
             step.addStart(traverser);
             if (step instanceof Barrier) {
-                context().parent().tell(new BarrierMessage((Barrier) step), self());
+                this.processBarrier((Barrier)step);
             } else {
                 while (step.hasNext()) {
                     this.processTraverser(step.next());
@@ -105,4 +109,11 @@ public final class WorkerTraversalActor extends AbstractActor implements
             }
         }
     }
+
+    private void processBarrier(final Barrier barrier) {
+        while(barrier.hasNextBarrier()) {
+            context().parent().tell(new BarrierMessage(barrier), self());
+        }
+        context().parent().tell(new BarrierSynchronizationMessage(barrier, true), self());
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
index 3e5ff14..467ac28 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierSynchronizationMessage.java
@@ -28,12 +28,18 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 public final class BarrierSynchronizationMessage implements SynchronizationMessage {
 
     private final String stepId;
+    private final boolean lock;
 
-    public BarrierSynchronizationMessage(final Barrier barrier) {
+    public BarrierSynchronizationMessage(final Barrier barrier, final boolean lock) {
         this.stepId = ((Step) barrier).getId();
+        this.lock = lock;
     }
 
     public String getStepId() {
         return this.stepId;
     }
+
+    public boolean getLock() {
+        return this.lock;
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c80abcf3/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
index e9721cc..5b8685c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
@@ -39,6 +39,7 @@ public final class SideEffectMessage {
     }
 
     public void addSideEffect(final Traversal.Admin<?, ?> traversal) {
+        System.out.println(traversal.getSideEffects().getClass());
         traversal.getSideEffects().add(this.sideEffectKey, this.sideEffect);
     }