You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/12 18:26:13 UTC

tinkerpop git commit: fixed a bug where nested traversal side-effects were not using message passing. lots of test cases now pass that didn't before. getting close.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 f8fc43d46 -> 45b03b20e


fixed a bug where nested traversal side-effects were not using message passing. lots of test cases now pass that didn't before. getting close.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/45b03b20
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/45b03b20
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/45b03b20

Branch: refs/heads/TINKERPOP-1564
Commit: 45b03b20e6835d78bac41fff27295b2ee9362df4
Parents: f8fc43d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Dec 12 11:26:10 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Dec 12 11:26:10 2016 -0700

----------------------------------------------------------------------
 .../process/akka/TinkerActorSystem.java         | 15 ++------
 .../process/akka/TraverserMailbox.java          | 39 ++++++++++----------
 .../process/akka/WorkerTraversalActor.java      |  5 ++-
 3 files changed, 27 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/45b03b20/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index 06183dc..f348293 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -21,29 +21,20 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
-import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.structure.Column;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
 import org.apache.tinkerpop.gremlin.structure.util.HashPartitioner;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.traversal.PartitionerStrategy;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.javatuples.Pair;
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.as;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.both;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.count;
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
 
 /**
@@ -73,9 +64,9 @@ public final class TinkerActorSystem<S, E> {
 
     public static void main(String args[]) throws Exception {
         final Graph graph = TinkerGraph.open();
-        graph.io(GryoIo.build()).readGraph("data/grateful-dead.kryo");
+        graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
         GraphTraversalSource g = graph.traversal().withStrategies(new PartitionerStrategy(new HashPartitioner(graph.partitioner(), 3)));
-        System.out.println(g.V().both().both().count().toList());
+        System.out.println(g.V().repeat(out().group("a").by("name").by(count())).times(2).cap("a").toList());
         // 1406914
         /*for (int i = 0; i < 10000; i++) {
             final Graph graph = TinkerGraph.open();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/45b03b20/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index 65983de..392b691 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -31,8 +31,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
 import scala.Option;
 
+import java.util.LinkedList;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -40,10 +40,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
 
     public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics {
-        private final Queue<Envelope> otherMessages = new ConcurrentLinkedQueue<>();
+        private final Queue<Envelope> otherMessages = new LinkedList<>();
         // TODO: we need a concurrent linked hash map
         private final TraverserSet<?> traverserMessages = new TraverserSet<>();
-        private final Queue<Envelope> haltMessages = new ConcurrentLinkedQueue<>();
+        private final Queue<Envelope> haltMessages = new LinkedList<>();
         private Envelope terminateToken = null;
         private final ActorRef owner;
         private final Object MUTEX = new Object();
@@ -67,18 +67,17 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
 
         public Envelope dequeue() {
             synchronized (MUTEX) {
+                if (!this.otherMessages.isEmpty())
+                    return this.otherMessages.poll();
                 if (!this.traverserMessages.isEmpty())
                     return new Envelope(this.traverserMessages.poll(), this.owner);
                 else if (null != this.terminateToken) {
                     final Envelope temp = this.terminateToken;
                     this.terminateToken = null;
                     return temp;
-                } else if (!this.haltMessages.isEmpty())
+                } else
                     return this.haltMessages.poll();
-                else
-                    return this.otherMessages.poll();
             }
-
         }
 
         public int numberOfMessages() {
@@ -94,18 +93,20 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
         }
 
         public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
-            for (final Envelope handle : this.otherMessages) {
-                deadLetters.enqueue(owner, handle);
-            }
-            for (final Traverser.Admin<?> traverser : this.traverserMessages) {
-                deadLetters.enqueue(owner, new Envelope(traverser, this.owner));
-            }
-            for (final Envelope handle : this.haltMessages) {
-                deadLetters.enqueue(owner, handle);
-            }
-            if (null != this.terminateToken) {
-                deadLetters.enqueue(owner, this.terminateToken);
-                this.terminateToken = null;
+            synchronized (MUTEX) {
+                for (final Envelope handle : this.otherMessages) {
+                    deadLetters.enqueue(owner, handle);
+                }
+                for (final Traverser.Admin<?> traverser : this.traverserMessages) {
+                    deadLetters.enqueue(owner, new Envelope(traverser, this.owner));
+                }
+                for (final Envelope handle : this.haltMessages) {
+                    deadLetters.enqueue(owner, handle);
+                }
+                if (null != this.terminateToken) {
+                    deadLetters.enqueue(owner, this.terminateToken);
+                    this.terminateToken = null;
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/45b03b20/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index b21d168..880f78e 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
@@ -70,8 +71,9 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
         // set up partition and traversal information
         this.localPartition = localPartition;
         this.partitioner = partitioner;
+        final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), context());
+        TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
         this.matrix = new TraversalMatrix<>(traversal);
-        this.matrix.getTraversal().setSideEffects(new WorkerTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
         final GraphStep graphStep = (GraphStep) traversal.getStartStep();
         if (0 == graphStep.getIds().length)
             ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges);
@@ -119,6 +121,7 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
                         while (this.barrierLock.hasNextBarrier()) {
                             master().tell(new BarrierAddMessage(this.barrierLock), self());
                         }
+                        this.voteToHalt = false;
                     }
                     // use termination token to determine termination condition
                     if (null != this.terminate) {