You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/12 18:26:13 UTC
tinkerpop git commit: fixed a bug where nested traversal side-effects
were not using message passing. lots of test cases now pass that didn't
before. getting close.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 f8fc43d46 -> 45b03b20e
fixed a bug where nested traversal side-effects were not using message passing. lots of test cases now pass that didn't before. getting close.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/45b03b20
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/45b03b20
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/45b03b20
Branch: refs/heads/TINKERPOP-1564
Commit: 45b03b20e6835d78bac41fff27295b2ee9362df4
Parents: f8fc43d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Dec 12 11:26:10 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Dec 12 11:26:10 2016 -0700
----------------------------------------------------------------------
.../process/akka/TinkerActorSystem.java | 15 ++------
.../process/akka/TraverserMailbox.java | 39 ++++++++++----------
.../process/akka/WorkerTraversalActor.java | 5 ++-
3 files changed, 27 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/45b03b20/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index 06183dc..f348293 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -21,29 +21,20 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.ActorSystem;
import akka.actor.Props;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
-import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
import org.apache.tinkerpop.gremlin.structure.util.HashPartitioner;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.traversal.PartitionerStrategy;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.javatuples.Pair;
-import java.util.Arrays;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.as;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.both;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.count;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
/**
@@ -73,9 +64,9 @@ public final class TinkerActorSystem<S, E> {
public static void main(String args[]) throws Exception {
final Graph graph = TinkerGraph.open();
- graph.io(GryoIo.build()).readGraph("data/grateful-dead.kryo");
+ graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
GraphTraversalSource g = graph.traversal().withStrategies(new PartitionerStrategy(new HashPartitioner(graph.partitioner(), 3)));
- System.out.println(g.V().both().both().count().toList());
+ System.out.println(g.V().repeat(out().group("a").by("name").by(count())).times(2).cap("a").toList());
// 1406914
/*for (int i = 0; i < 10000; i++) {
final Graph graph = TinkerGraph.open();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/45b03b20/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index 65983de..392b691 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -31,8 +31,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
import scala.Option;
+import java.util.LinkedList;
import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -40,10 +40,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> {
public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics {
- private final Queue<Envelope> otherMessages = new ConcurrentLinkedQueue<>();
+ private final Queue<Envelope> otherMessages = new LinkedList<>();
// TODO: we need a concurrent linked hash map
private final TraverserSet<?> traverserMessages = new TraverserSet<>();
- private final Queue<Envelope> haltMessages = new ConcurrentLinkedQueue<>();
+ private final Queue<Envelope> haltMessages = new LinkedList<>();
private Envelope terminateToken = null;
private final ActorRef owner;
private final Object MUTEX = new Object();
@@ -67,18 +67,17 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
public Envelope dequeue() {
synchronized (MUTEX) {
+ if (!this.otherMessages.isEmpty())
+ return this.otherMessages.poll();
if (!this.traverserMessages.isEmpty())
return new Envelope(this.traverserMessages.poll(), this.owner);
else if (null != this.terminateToken) {
final Envelope temp = this.terminateToken;
this.terminateToken = null;
return temp;
- } else if (!this.haltMessages.isEmpty())
+ } else
return this.haltMessages.poll();
- else
- return this.otherMessages.poll();
}
-
}
public int numberOfMessages() {
@@ -94,18 +93,20 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
}
public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
- for (final Envelope handle : this.otherMessages) {
- deadLetters.enqueue(owner, handle);
- }
- for (final Traverser.Admin<?> traverser : this.traverserMessages) {
- deadLetters.enqueue(owner, new Envelope(traverser, this.owner));
- }
- for (final Envelope handle : this.haltMessages) {
- deadLetters.enqueue(owner, handle);
- }
- if (null != this.terminateToken) {
- deadLetters.enqueue(owner, this.terminateToken);
- this.terminateToken = null;
+ synchronized (MUTEX) {
+ for (final Envelope handle : this.otherMessages) {
+ deadLetters.enqueue(owner, handle);
+ }
+ for (final Traverser.Admin<?> traverser : this.traverserMessages) {
+ deadLetters.enqueue(owner, new Envelope(traverser, this.owner));
+ }
+ for (final Envelope handle : this.haltMessages) {
+ deadLetters.enqueue(owner, handle);
+ }
+ if (null != this.terminateToken) {
+ deadLetters.enqueue(owner, this.terminateToken);
+ this.terminateToken = null;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/45b03b20/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index b21d168..880f78e 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
@@ -70,8 +71,9 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
// set up partition and traversal information
this.localPartition = localPartition;
this.partitioner = partitioner;
+ final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), context());
+ TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
this.matrix = new TraversalMatrix<>(traversal);
- this.matrix.getTraversal().setSideEffects(new WorkerTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
final GraphStep graphStep = (GraphStep) traversal.getStartStep();
if (0 == graphStep.getIds().length)
((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges);
@@ -119,6 +121,7 @@ public final class WorkerTraversalActor extends AbstractActor implements Require
while (this.barrierLock.hasNextBarrier()) {
master().tell(new BarrierAddMessage(this.barrierLock), self());
}
+ this.voteToHalt = false;
}
// use termination token to determine termination condition
if (null != this.terminate) {