You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/09 14:29:10 UTC

tinkerpop git commit: removed an unneeded interface. cleaned up TraverserMailbox. Added comments and various clean ups for discussion with @spmallette.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 07d830060 -> 16c658008


removed an unneeded interface. cleaned up TraverserMailbox. Added comments and various clean ups for discussion with @spmallette.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/16c65800
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/16c65800
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/16c65800

Branch: refs/heads/TINKERPOP-1564
Commit: 16c658008bbda33b480bc1d1a6e5a7bf1912e758
Parents: 07d8300
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Dec 9 07:29:01 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Dec 9 07:29:01 2016 -0700

----------------------------------------------------------------------
 .../process/akka/MasterTraversalActor.java      | 15 ++---
 .../process/akka/TraverserMailbox.java          | 41 ++++++-----
 .../process/akka/WorkerTraversalActor.java      | 71 ++++++++++----------
 .../akka/messages/SynchronizationMessage.java   | 26 -------
 .../akka/messages/VoteToHaltMessage.java        |  5 +-
 5 files changed, 65 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index e525220..c03cbe1 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -79,12 +79,15 @@ public final class MasterTraversalActor extends AbstractActor implements Require
                     this.processTraverser(traverser);
                 }).
                 match(BarrierAddMessage.class, barrierMerge -> {
+                    // get the barrier updates from the workers to synchronize against the master barrier
                     final Barrier barrier = (Barrier) this.matrix.getStepById(barrierMerge.getStepId());
-                    assert null == this.barrierLock || this.barrierLock == barrier;
-                    this.barrierLock = barrier;
+                    if (null == this.barrierLock)
+                        this.barrierLock = barrier;
+                    assert this.barrierLock == barrier;
                     this.barrierLock.addBarrier(barrierMerge.getBarrier());
                 }).
                 match(SideEffectAddMessage.class, sideEffect -> {
+                    // get the side-effect updates from the workers to generate the master side-effects
                     this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue());
                 }).
                 match(VoteToHaltMessage.class, voteToHalt -> {
@@ -95,7 +98,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require
                             this.sendTraverser(step.next());
                         }
                         this.barrierLock = null;
-                        this.workers.get(this.leaderWorker).tell(WorkerTraversalActor.Terminate.MAYBE, self());
+                        this.workers.get(this.leaderWorker).tell(StartMessage.instance(), self());
                     } else
                         context().system().terminate();
                 }).build());
@@ -113,12 +116,6 @@ public final class MasterTraversalActor extends AbstractActor implements Require
         }
     }
 
-    /*private void broadcast(final Object message) {
-        for (final ActorSelection worker : this.workers.values()) {
-            worker.tell(message, self());
-        }
-    }*/
-
     private void processTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted() || traverser.get() instanceof Element) {
             this.sendTraverser(traverser);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index 0e84919..940dfb1 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -29,7 +29,6 @@ import com.typesafe.config.Config;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
 import scala.Option;
 
 import java.util.Queue;
@@ -42,49 +41,49 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
 
     public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics {
-        private final TraverserSet<?> traverserSet = new TraverserSet<>(new ConcurrentHashMap<>());
-        private final Queue<Envelope> barrierSyncs = new ConcurrentLinkedQueue<>();
-        private final Queue<Envelope> haltSyncs = new ConcurrentLinkedQueue<>();
-        private final Queue<Envelope> queue = new ConcurrentLinkedQueue<>();
+        private final TraverserSet<?> traverserMessages = new TraverserSet<>(new ConcurrentHashMap<>());
+        private final Queue<Envelope> haltMessages = new ConcurrentLinkedQueue<>();
+        private final Queue<Envelope> otherMessages = new ConcurrentLinkedQueue<>();
         private final ActorRef owner;
 
         public TraverserMessageQueue(final ActorRef owner) {
             this.owner = owner;
         }
 
-        // these must be implemented; queue used as example
         public void enqueue(final ActorRef receiver, final Envelope handle) {
             if (handle.message() instanceof Traverser.Admin)
-                this.traverserSet.offer((Traverser.Admin) handle.message());
-            else if (handle.message() instanceof SideEffectAddMessage)
-                this.queue.offer(handle);
+                this.traverserMessages.offer((Traverser.Admin) handle.message());
             else if (handle.message() instanceof VoteToHaltMessage)
-                this.haltSyncs.offer(handle);
+                this.haltMessages.offer(handle);
             else
-                this.queue.offer(handle);
+                this.otherMessages.offer(handle);
         }
 
         public Envelope dequeue() {
-            if (!this.queue.isEmpty())
-                return this.queue.poll();
-            else if (!this.traverserSet.isEmpty())
-                return new Envelope(this.traverserSet.poll(), this.owner);
-            else if (!this.barrierSyncs.isEmpty())
-                return this.barrierSyncs.poll();
+            if (!this.otherMessages.isEmpty())
+                return this.otherMessages.poll();
+            else if (!this.traverserMessages.isEmpty())
+                return new Envelope(this.traverserMessages.poll(), this.owner);
             else
-                return this.haltSyncs.poll();
+                return this.haltMessages.poll();
         }
 
         public int numberOfMessages() {
-            return this.queue.size() + this.traverserSet.size() + this.barrierSyncs.size() + this.haltSyncs.size();
+            return this.otherMessages.size() + this.traverserMessages.size() + this.haltMessages.size();
         }
 
         public boolean hasMessages() {
-            return !this.queue.isEmpty() || !this.traverserSet.isEmpty() || !this.barrierSyncs.isEmpty() || !this.haltSyncs.isEmpty();
+            return !this.otherMessages.isEmpty() || !this.traverserMessages.isEmpty() || !this.haltMessages.isEmpty();
         }
 
         public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
-            for (final Envelope handle : this.queue) {
+            for (final Envelope handle : this.otherMessages) {
+                deadLetters.enqueue(owner, handle);
+            }
+            for (final Traverser.Admin<?> traverser : this.traverserMessages) {
+                deadLetters.enqueue(owner, new Envelope(traverser, this.owner));
+            }
+            for (final Envelope handle : this.haltMessages) {
                 deadLetters.enqueue(owner, handle);
             }
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 0107273..ad3f8d1 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -48,32 +48,29 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
 
     public enum Terminate {MAYBE, YES, NO}
 
-    private TraversalMatrix<?, ?> matrix = null;
+    private final TraversalMatrix<?, ?> matrix;
     private final Partition localPartition;
     private final Partitioner partitioner;
-    private boolean voteToHalt = true;
+    //
     private final Map<String, ActorSelection> workers = new HashMap<>();
-    private String neighbor = null;
-    private boolean leader = false;
-    private Terminate terminationToken = null;
-    ///
+    private final String neighborWorker;
+    private boolean isLeader;
+    private Terminate terminate = null;
+    private boolean voteToHalt = true;
     private Barrier barrierLock = null;
 
     public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
         System.out.println("worker[created]: " + self().path());
-        this.matrix = new TraversalMatrix<>(traversal);
-        this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+        // set up partition  and traversal information
         this.localPartition = localPartition;
         this.partitioner = partitioner;
+        this.matrix = new TraversalMatrix<>(traversal);
+        this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+        ((GraphStep) traversal.getStartStep()).setIteratorSupplier(this.localPartition::vertices);
         // create termination ring topology
-        for (int i = 0; i < this.partitioner.getPartitions().size(); i++) {
-            if (this.partitioner.getPartitions().get(i) == this.localPartition) {
-                this.neighbor = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
-                this.leader = i == 0;
-                break;
-            }
-        }
-        ((GraphStep) traversal.getStartStep()).setIteratorSupplier(localPartition::vertices);
+        final int i = this.partitioner.getPartitions().indexOf(this.localPartition);
+        this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
+        this.isLeader = i == 0;
 
         receive(ReceiveBuilder.
                 match(StartMessage.class, start -> {
@@ -83,19 +80,22 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
                         this.sendTraverser(step.next());
                     }
                     // internal vote to have in mailbox as final message to process
-                    if (this.leader)
-                        self().tell(Terminate.MAYBE, self());
+                    assert null == this.terminate;
+                    if (this.isLeader) {
+                        this.terminate = Terminate.MAYBE;
+                        self().tell(VoteToHaltMessage.instance(), self());
+                    }
                 }).
                 match(Traverser.Admin.class, traverser -> {
                     this.voteToHalt = false;
                     this.processTraverser(traverser);
                 }).
                 match(SideEffectAddMessage.class, sideEffect -> {
-                    // TODO
+                    // TODO: this is to update the local worker side-effects. necessary only for traversals that introspect on side-effects
                 }).
                 match(Terminate.class, terminate -> {
-                    assert this.leader || this.terminationToken != Terminate.MAYBE;
-                    this.terminationToken = terminate;
+                    assert this.isLeader || this.terminate != Terminate.MAYBE;
+                    this.terminate = terminate;
                     self().tell(VoteToHaltMessage.instance(), self());
                 }).
                 match(VoteToHaltMessage.class, haltSync -> {
@@ -104,17 +104,17 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
                         while (this.barrierLock.hasNextBarrier()) {
                             master().tell(new BarrierAddMessage(this.barrierLock), self());
                         }
-                        self().tell(VoteToHaltMessage.instance(), self());
-                    } else if (null != this.terminationToken) {
-                        // use termination token to determine termination condition
-                        if (this.leader) {
-                            if (this.terminationToken == Terminate.YES && this.voteToHalt)
+                    }
+                    // use termination token to determine termination condition
+                    if (null != this.terminate) {
+                        if (this.isLeader) {
+                            if (this.voteToHalt && Terminate.YES == this.terminate)
                                 master().tell(VoteToHaltMessage.instance(), self());
                             else
-                                worker(this.neighbor).tell(Terminate.YES, self());
+                                worker(this.neighborWorker).tell(Terminate.YES, self());
                         } else
-                            worker(this.neighbor).tell(this.voteToHalt ? this.terminationToken : Terminate.NO, self());
-                        this.terminationToken = null;
+                            worker(this.neighborWorker).tell(this.voteToHalt ? this.terminate : Terminate.NO, self());
+                        this.terminate = null;
                         this.voteToHalt = true;
                     }
                 }).build()
@@ -126,8 +126,9 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
         final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
         step.addStart(traverser);
         if (step instanceof Barrier) {
-            assert null == this.barrierLock || step == this.barrierLock;
-            this.barrierLock = (Barrier) step;
+            if (null == this.barrierLock)
+                this.barrierLock = (Barrier) step;
+            assert step == this.barrierLock;
         } else {
             while (step.hasNext()) {
                 this.sendTraverser(step.next());
@@ -145,11 +146,11 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
             self().tell(traverser, self());
     }
 
-    private ActorSelection worker(final String path) {
-        ActorSelection worker = this.workers.get(path);
+    private ActorSelection worker(final String workerPath) {
+        ActorSelection worker = this.workers.get(workerPath);
         if (null == worker) {
-            worker = context().actorSelection(path);
-            this.workers.put(path, worker);
+            worker = context().actorSelection(workerPath);
+            this.workers.put(workerPath, worker);
         }
         return worker;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SynchronizationMessage.java
deleted file mode 100644
index 54e7ade..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SynchronizationMessage.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface SynchronizationMessage {
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/16c65800/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
index f7d8970..767699c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
@@ -22,11 +22,12 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class VoteToHaltMessage implements SynchronizationMessage {
+public final class VoteToHaltMessage {
 
     private static final VoteToHaltMessage INSTANCE = new VoteToHaltMessage();
 
-    private VoteToHaltMessage() {}
+    private VoteToHaltMessage() {
+    }
 
     public static VoteToHaltMessage instance() {
         return INSTANCE;