You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/07 20:53:12 UTC
tinkerpop git commit: added more test traversals to demonstrate cool
features. Cleaned this up a bit. Added a new StartSynchronizationMessage.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 c80abcf37 -> 3063b99b7
added more test traversals to demonstrate cool features. Cleaned this up a bit. Added a new StartSynchronizationMessage.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3063b99b
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3063b99b
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3063b99b
Branch: refs/heads/TINKERPOP-1564
Commit: 3063b99b7521de30965edc3f744d85cdba2f6c36
Parents: c80abcf
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 7 13:53:09 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 7 13:53:09 2016 -0700
----------------------------------------------------------------------
.../akka/ComputerActivationStrategy.java | 54 ++++++++++++++++++++
.../akka/DistributedTraversalSideEffects.java | 9 +---
.../process/akka/MasterTraversalActor.java | 33 +++++++-----
.../process/akka/TinkerActorSystem.java | 45 ++++++++++++----
.../process/akka/TraverserMailbox.java | 4 +-
.../process/akka/WorkerTraversalActor.java | 49 +++++++++++-------
.../akka/messages/BarrierMergeMessage.java | 47 +++++++++++++++++
.../process/akka/messages/BarrierMessage.java | 47 -----------------
.../akka/messages/SideEffectMergeMessage.java | 42 +++++++++++++++
.../akka/messages/SideEffectMessage.java | 53 -------------------
.../messages/StartSynchronizationMessage.java | 32 ++++++++++++
11 files changed, 265 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/ComputerActivationStrategy.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/ComputerActivationStrategy.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/ComputerActivationStrategy.java
new file mode 100644
index 0000000..35f2bb9
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/ComputerActivationStrategy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ComputerActivationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy {
+
+ private static final ComputerActivationStrategy INSTANCE = new ComputerActivationStrategy();
+
+ private ComputerActivationStrategy() {
+ }
+
+ @Override
+ public void apply(final Traversal.Admin<?, ?> traversal) {
+ if (!TraversalHelper.onGraphComputer(traversal))
+ return;
+ final boolean globalChild = TraversalHelper.isGlobalChild(traversal);
+ for (final Step<?, ?> step : traversal.getSteps()) {
+ // only global children are graph computing
+ if (globalChild && step instanceof GraphComputing)
+ ((GraphComputing) step).onGraphComputer();
+ }
+ }
+
+ public static ComputerActivationStrategy instance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
index ceacdd3..8b29a08 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
@@ -20,13 +20,8 @@
package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.ActorContext;
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
import java.util.Optional;
import java.util.Set;
@@ -78,7 +73,7 @@ public final class DistributedTraversalSideEffects implements TraversalSideEffec
@Override
public void add(final String key, final Object value) {
- this.context.parent().tell(new SideEffectMessage(key, value), this.context.self());
+ this.context.parent().tell(new SideEffectMergeMessage(key, value), this.context.self());
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 824c7ca..94d9c27 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -22,22 +22,27 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.AbstractActor;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.RequiresMessageQueue;
import akka.japi.pf.ReceiveBuilder;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMergeMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartSynchronizationMessage;
import java.util.ArrayList;
import java.util.HashMap;
@@ -54,11 +59,15 @@ public final class MasterTraversalActor extends AbstractActor implements Require
private final Traversal.Admin<?, ?> traversal;
private final TraversalMatrix<?, ?> matrix;
private final Partitioner partitioner;
- private List<ActorPath> workers;
+ private List<ActorSelection> workers;
private final Map<String, Set<ActorPath>> synchronizationLocks = new HashMap<>();
public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
System.out.println("master[created]: " + self().path());
+ final TraversalStrategies strategies = traversal.getStrategies().clone();
+ strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class);
+ strategies.addStrategies(ComputerActivationStrategy.instance());
+ traversal.setStrategies(strategies);
traversal.applyStrategies();
this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
this.matrix = new TraversalMatrix<>(this.traversal);
@@ -69,7 +78,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require
match(Traverser.Admin.class, traverser -> {
this.processTraverser(traverser);
}).
- match(BarrierMessage.class, barrier -> {
+ match(BarrierMergeMessage.class, barrier -> {
final Barrier barrierStep = ((Barrier) this.matrix.getStepById(barrier.getStepId()));
barrierStep.addBarrier(barrier.getBarrier());
broadcast(new BarrierSynchronizationMessage(barrierStep, true));
@@ -89,9 +98,9 @@ public final class MasterTraversalActor extends AbstractActor implements Require
}
}
}).
- match(SideEffectMessage.class, sideEffect -> {
- this.traversal.getSideEffects().add(sideEffect.getKey(),sideEffect.getValue());
- //this.broadcast(new SideEffectMessage(sideEffect.getKey(), sideEffect.getValue()));
+ match(SideEffectMergeMessage.class, sideEffect -> {
+ this.traversal.getSideEffects().add(sideEffect.getKey(), sideEffect.getValue());
+ //this.broadcast(new SideEffectMergeMessage(sideEffect.getKey(), sideEffect.getValue()));
}).
match(HaltSynchronizationMessage.class, haltSync -> {
Set<ActorPath> locks = this.synchronizationLocks.get(Traverser.Admin.HALT);
@@ -115,16 +124,16 @@ public final class MasterTraversalActor extends AbstractActor implements Require
this.workers = new ArrayList<>(partitions.size());
for (final Partition partition : partitions) {
final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), "worker-" + partition.hashCode());
- this.workers.add(worker.path());
+ this.workers.add(context().actorSelection(worker.path()));
}
- for (final ActorPath worker : this.workers) {
- context().actorSelection(worker).tell(TinkerActorSystem.State.START, self());
+ for (final ActorSelection worker : this.workers) {
+ worker.tell(StartSynchronizationMessage.instance(), self());
}
}
private void broadcast(final Object message) {
- for (final ActorPath worker : this.workers) {
- context().actorSelection(worker).tell(message, self());
+ for (final ActorSelection worker : this.workers) {
+ worker.tell(message, self());
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index bf3318c..b60f267 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -21,40 +21,65 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
import org.apache.tinkerpop.gremlin.structure.util.HashPartitioner;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.as;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.both;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class TinkerActorSystem {
- public static enum State {START}
-
private final ActorSystem system;
public TinkerActorSystem(final Traversal.Admin<?, ?> traversal) {
this.system = ActorSystem.create("traversal-" + traversal.hashCode());
- this.system.actorOf(Props.create(MasterTraversalActor.class, traversal, new HashPartitioner(traversal.getGraph().get().partitioner(), 10)), "master");
+ this.system.actorOf(Props.create(MasterTraversalActor.class, traversal, new HashPartitioner(traversal.getGraph().get().partitioner(), 3)), "master");
}
//////////////
public static void main(String args[]) throws Exception {
final Graph graph = TinkerGraph.open();
- graph.io(GryoIo.build()).readGraph("data/grateful-dead.kryo");
- /*final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().match(
- as("a").out().as("b"),
- as("b").in().as("c"),
- as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin();*/
- final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount("a").by("name").cap("a").select(Column.keys).unfold().limit(10).asAdmin();
- new TinkerActorSystem(traversal);
+ graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
+ final GraphTraversalSource g = graph.traversal().withComputer();
+ final List<Traversal.Admin<?, ?>> traversals = Arrays.asList(
+ // match() works
+ g.V().match(
+ as("a").out("created").as("b"),
+ as("b").in("created").as("c"),
+ as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin(),
+ // side-effects work
+ g.V().repeat(both()).times(2).
+ groupCount("a").by("name").
+ cap("a").
+ select(Column.keys).unfold().limit(3).asAdmin(),
+ // barries work and beyond the local star graph works
+ g.V().repeat(both()).times(2).hasLabel("person").
+ group().
+ by("name").
+ by(out("created").values("name").dedup().fold()).asAdmin()
+ );
+ for (final Traversal.Admin<?, ?> traversal : traversals) {
+ System.out.println("EXECUTING: " + traversal.getBytecode());
+ final TinkerActorSystem actors = new TinkerActorSystem(traversal);
+ while (!actors.system.isTerminated()) {
+
+ }
+ System.out.println("//////////////////////////////////\n");
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index 69716a6..e0f84dc 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -30,7 +30,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
import scala.Option;
import java.util.Queue;
@@ -57,7 +57,7 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue
public void enqueue(final ActorRef receiver, final Envelope handle) {
if (handle.message() instanceof Traverser.Admin)
this.traverserSet.offer((Traverser.Admin) handle.message());
- else if (handle.message() instanceof SideEffectMessage)
+ else if (handle.message() instanceof SideEffectMergeMessage)
this.queue.offer(handle);
else if (handle.message() instanceof BarrierSynchronizationMessage)
this.barrierSyncs.offer(handle);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 96cf6dd..20475ef 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -20,22 +20,27 @@
package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
import akka.actor.AbstractActor;
-import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.dispatch.RequiresMessageQueue;
import akka.japi.pf.ReceiveBuilder;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMergeMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
-import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMergeMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartSynchronizationMessage;
+
+import java.util.HashMap;
+import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -44,23 +49,21 @@ public final class WorkerTraversalActor extends AbstractActor implements
RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
private final TraversalMatrix<?, ?> matrix;
- private final Partition partition;
+ private final Partition localPartition;
private final Partitioner partitioner;
private boolean sentHaltMessage = false;
+ private final Map<String, ActorSelection> workers = new HashMap<>();
-
- public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition partition, final Partitioner partitioner) {
+ public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) {
System.out.println("worker[created]: " + self().path());
this.matrix = new TraversalMatrix<>(traversal);
-
- this.partition = partition;
+ this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+ this.localPartition = localPartition;
this.partitioner = partitioner;
- ((GraphStep) traversal.getStartStep()).setIteratorSupplier(partition::vertices);
-
+ ((GraphStep) traversal.getStartStep()).setIteratorSupplier(localPartition::vertices);
receive(ReceiveBuilder.
- match(TinkerActorSystem.State.class, state -> {
- this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
+ match(StartSynchronizationMessage.class, start -> {
final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
while (step.hasNext()) {
this.processTraverser(step.next());
@@ -68,13 +71,13 @@ public final class WorkerTraversalActor extends AbstractActor implements
}).
match(BarrierSynchronizationMessage.class, barrierSync -> {
final Barrier barrier = this.matrix.getStepById(barrierSync.getStepId());
- if(barrierSync.getLock()) {
+ if (barrierSync.getLock()) {
this.processBarrier(barrier);
} else {
barrier.done();
}
}).
- match(SideEffectMessage.class, sideEffect -> {
+ match(SideEffectMergeMessage.class, sideEffect -> {
// TODO: sideEffect.setSideEffect(this.matrix.getTraversal());
}).
match(HaltSynchronizationMessage.class, haltSync -> {
@@ -94,14 +97,20 @@ public final class WorkerTraversalActor extends AbstractActor implements
private void processTraverser(final Traverser.Admin traverser) {
if (traverser.isHalted())
context().parent().tell(traverser, self());
- else if (traverser.get() instanceof Element && !this.partition.contains((Element) traverser.get())) {
+ else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) {
final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
- context().actorSelection("../worker-" + otherPartition.hashCode()).tell(traverser, self());
+ final String workerPathString = "../worker-" + otherPartition.hashCode();
+ ActorSelection worker = this.workers.get(workerPathString);
+ if (null == worker) {
+ worker = context().actorSelection(workerPathString);
+ this.workers.put(workerPathString, worker);
+ }
+ worker.tell(traverser, self());
} else {
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
step.addStart(traverser);
if (step instanceof Barrier) {
- this.processBarrier((Barrier)step);
+ this.processBarrier((Barrier) step);
} else {
while (step.hasNext()) {
this.processTraverser(step.next());
@@ -111,8 +120,10 @@ public final class WorkerTraversalActor extends AbstractActor implements
}
private void processBarrier(final Barrier barrier) {
- while(barrier.hasNextBarrier()) {
- context().parent().tell(new BarrierMessage(barrier), self());
+ if (barrier instanceof Bypassing)
+ ((Bypassing) barrier).setBypass(true);
+ while (barrier.hasNextBarrier()) {
+ context().parent().tell(new BarrierMergeMessage(barrier), self());
}
context().parent().tell(new BarrierSynchronizationMessage(barrier, true), self());
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
new file mode 100644
index 0000000..55e3e70
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMergeMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierMergeMessage {
+
+ private final Object barrier;
+ private final String stepId;
+
+ public BarrierMergeMessage(final Barrier barrier) {
+ this.barrier = barrier.nextBarrier();
+ this.stepId = ((Step) barrier).getId();
+ }
+
+ public Object getBarrier() {
+ return this.barrier;
+ }
+
+ public String getStepId() {
+ return this.stepId;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMessage.java
deleted file mode 100644
index 211d227..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/BarrierMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierMessage {
-
- private final Object barrier;
- private final String stepId;
-
- public BarrierMessage(final Barrier barrier) {
- this.barrier = barrier.nextBarrier();
- this.stepId = ((Step) barrier).getId();
- }
-
- public Object getBarrier() {
- return this.barrier;
- }
-
- public String getStepId() {
- return this.stepId;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
new file mode 100644
index 0000000..1cb6866
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMergeMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectMergeMessage {
+
+ private final String sideEffectKey;
+ private final Object sideEffect;
+
+ public SideEffectMergeMessage(final String sideEffectKey, final Object sideEffect) {
+ this.sideEffect = sideEffect;
+ this.sideEffectKey = sideEffectKey;
+ }
+
+ public String getKey() {
+ return this.sideEffectKey;
+ }
+
+ public Object getValue() {
+ return this.sideEffect;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
deleted file mode 100644
index 5b8685c..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectMessage {
-
- private final String sideEffectKey;
- private final Object sideEffect;
-
- public SideEffectMessage(final String sideEffectKey, final Object sideEffect) {
- this.sideEffect = sideEffect;
- this.sideEffectKey = sideEffectKey;
- }
-
- public void setSideEffect(final Traversal.Admin<?, ?> traversal) {
- traversal.getSideEffects().set(this.sideEffectKey, this.sideEffect);
- }
-
- public void addSideEffect(final Traversal.Admin<?, ?> traversal) {
- System.out.println(traversal.getSideEffects().getClass());
- traversal.getSideEffects().add(this.sideEffectKey, this.sideEffect);
- }
-
- public String getKey() {
- return this.sideEffectKey;
- }
-
- public Object getValue() {
- return this.sideEffect;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3063b99b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
new file mode 100644
index 0000000..bab03f5
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartSynchronizationMessage.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StartSynchronizationMessage {
+
+ private static final StartSynchronizationMessage INSTANCE = new StartSynchronizationMessage();
+
+ public static StartSynchronizationMessage instance() {
+ return INSTANCE;
+ }
+}