You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/24 19:02:36 UTC
tinkerpop git commit: okay,
its official official. Every ProcessTestSuite traversal works with
GraphActors except for those with Mutating. This is huge. GraphActors is
semantically sound and executes a distributed traversal via message passing.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 9bb808f0a -> 0d5db8208
okay, its official official. Every ProcessTestSuite traversal works with GraphActors except for those with Mutating. This is huge. GraphActors is semantically sound and executes a distributed traversal via message passing.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0d5db820
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0d5db820
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0d5db820
Branch: refs/heads/TINKERPOP-1564
Commit: 0d5db820877a1495e1c9bf7a2f12072ddc083b5f
Parents: 9bb808f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 24 12:02:33 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 24 12:02:33 2017 -0700
----------------------------------------------------------------------
.../traversal/TraversalMasterProgram.java | 20 +++---
.../traversal/TraversalWorkerProgram.java | 66 ++++++++++----------
.../verification/ActorVerificationStrategy.java | 6 --
.../traversal/step/sideEffect/InjectStep.java | 26 +++++++-
4 files changed, 65 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d5db820/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index eb12a25..90d6edb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -38,7 +38,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
@@ -85,13 +84,6 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) {
this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i));
}
- // inject step processing should start at the master traversal
- if (this.traversal.getStartStep() instanceof InjectStep) {
- final Step<?, ?> step = this.traversal.getStartStep().getNextStep();
- while (step.hasNext()) {
- this.processTraverser(step.next());
- }
- }
// first pass of a two pass termination detection
this.voteToHalt = false;
this.master.send(this.neighborAddress, Terminate.NO);
@@ -149,22 +141,22 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
this.voteToHalt = true;
this.master.send(this.neighborAddress, Terminate.YES);
} else {
- // get any dangling local results
+ // get any dangling local results (e.g. workers have no data but a reducing barrier is waiting for data)
while (this.traversal.hasNext()) {
final Traverser.Admin traverser = this.traversal.nextTraverser();
this.traverserResults.add(-1 == this.orderCounter ? traverser : new OrderedTraverser(traverser, this.orderCounter++));
}
+ // if there is an ordering, order the result set
if (this.orderCounter != -1)
this.traverserResults.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
-
- TraversalActorProgram.attach(this.traverserResults, this.master.partitioner().getGraph());
// generate the final result to send back to the GraphActors program
final Map<String, Object> sideEffects = new HashMap<>();
for (final String key : this.traversal.getSideEffects().keys()) {
sideEffects.put(key, this.traversal.getSideEffects().get(key));
}
+ // set the result (traversers and side-effects) to return to user
this.master.setResult(Pair.with(this.traverserResults, sideEffects));
- // close master
+ // close master (and cascade close all workers)
this.master.close();
}
} else {
@@ -184,10 +176,11 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
}
private void processTraverser(final Traverser.Admin traverser) {
- TraversalActorProgram.attach(traverser, this.master.partitioner().getGraph());
if (traverser.isHalted() || traverser.get() instanceof Element) {
this.sendTraverser(traverser);
} else {
+ // attach traverser for local processing at the master actor
+ TraversalActorProgram.attach(traverser, this.master.partitioner().getGraph());
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
step.addStart(traverser);
if (step instanceof Barrier) {
@@ -202,6 +195,7 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
private void sendTraverser(final Traverser.Admin traverser) {
if (traverser.isHalted()) {
+ TraversalActorProgram.attach(traverser, this.master.partitioner().getGraph());
this.traverserResults.add(traverser);
return;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d5db820/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index 67d8b4a..31d8a0f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -34,7 +34,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Element;
@@ -59,22 +58,14 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) {
this.self = self;
+ // create a pass-through side-effects which sends SideEffectAddMessages to the master actor
final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
this.matrix = new TraversalMatrix<>(traversal);
+ // configure distributing and pushing semantics for worker execution
Distributing.configure(traversal, false, true);
Pushing.configure(traversal, true, false);
- }
-
- @Override
- public void setup() {
- // create termination ring topology
- final int i = this.self.workers().indexOf(this.self.address());
- this.neighborAddress = i == this.self.workers().size() - 1 ? this.self.master() : this.self.workers().get(i + 1);
- for (int j = 0; j < this.self.partition().partitioner().getPartitions().size(); j++) {
- this.partitionToWorkerMap.put(this.self.partition().partitioner().getPartitions().get(j), this.self.workers().get(j));
- }
- // configure all the GraphSteps to be partition-centric
+ // configure all the GraphSteps to be partition-centric (TODO: GraphStep should implement distributing and be smart to get the partition from the traversal)
TraversalHelper.getStepsOfAssignableClassRecursively(GraphStep.class, this.matrix.getTraversal()).forEach(graphStep -> {
if (0 == graphStep.getIds().length)
graphStep.setIteratorSupplier(graphStep.returnsVertex() ?
@@ -86,12 +77,20 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
() -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains));
}
});
- // once loaded, start processing start step (unless its an inject step)
+ }
+
+ @Override
+ public void setup() {
+ // create termination ring topology
+ final int i = this.self.workers().indexOf(this.self.address());
+ this.neighborAddress = i == this.self.workers().size() - 1 ? this.self.master() : this.self.workers().get(i + 1);
+ for (int j = 0; j < this.self.partition().partitioner().getPartitions().size(); j++) {
+ this.partitionToWorkerMap.put(this.self.partition().partitioner().getPartitions().get(j), this.self.workers().get(j));
+ }
+ // once loaded, start processing start step
final Step<?, ?> step = this.matrix.getTraversal().getStartStep();
- if (!(step instanceof InjectStep)) {
- while (step.hasNext()) {
- this.processTraverser(step.next());
- }
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
}
}
@@ -142,12 +141,15 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
//////////////
private void processTraverser(final Traverser.Admin traverser) {
+ // only mid-traversal V()/E() traversers can be non-locally processed
assert !(traverser.get() instanceof Element) ||
this.self.partition().contains((Element) traverser.get()) ||
- this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep; // only mid-traversal V()/E() traversers can be non-locally processed
- if (traverser.isHalted())
+ this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep;
+ if (traverser.isHalted()) {
+ // send halted traversers to master
this.sendTraverser(traverser);
- else {
+ } else {
+ // locally process traverser
TraversalActorProgram.attach(traverser, this.self.partition());
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
step.addStart(traverser);
@@ -164,26 +166,24 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
private void sendTraverser(final Traverser.Admin traverser) {
this.voteToHalt = false;
this.detachTraverser(traverser);
- if (traverser.isHalted())
+ if (traverser.isHalted()) {
+ // send halted traversers to master
this.self.send(this.self.master(), traverser);
- else if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep)
- this.broadcast(traverser);
- else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get()))
+ } else if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep) {
+ // mid-traversal V()/E() traversers need to be broadcasted across all workers/partitions
+ for (final Address.Worker worker : this.self.workers()) {
+ this.self.send(worker, traverser);
+ }
+ } else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get())) {
+ // if the traverser references a non-local element, send the traverser to the appropriate worker/partition
this.self.send(this.partitionToWorkerMap.get(this.self.partition().partitioner().find((Element) traverser.get())), traverser);
- else
+ } else {
+ // if the traverser is local to the worker, send traverser to self
this.self.send(this.self.address(), traverser);
-
-
- }
-
- private void broadcast(final Object message) {
- for (final Address.Worker worker : this.self.workers()) {
- this.self.send(worker, message);
}
}
private final Traverser.Admin detachTraverser(final Traverser.Admin traverser) {
return TraversalActorProgram.DETACH ? traverser.detach() : traverser;
}
-
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d5db820/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
index 82f0e0c..49aafb6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
@@ -21,11 +21,8 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verificat
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -39,9 +36,6 @@ public final class ActorVerificationStrategy extends AbstractTraversalStrategy<T
@Override
public void apply(final Traversal.Admin<?, ?> traversal) {
- if (TraversalHelper.getStepsOfAssignableClass(InjectStep.class, TraversalHelper.getRootTraversal(traversal)).size() > 0)
- if (!(TraversalHelper.getRootTraversal(traversal).getStartStep() instanceof InjectStep))
- throw new VerificationException("Inject traversal currently not supported", traversal);
ReadOnlyStrategy.instance().apply(traversal);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d5db820/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
index 34b5eab..d2138fe 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
@@ -19,14 +19,21 @@
package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.util.iterator.ArrayIterator;
+import java.util.Collections;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class InjectStep<S> extends StartStep<S> {
+public final class InjectStep<S> extends StartStep<S> implements Distributing, Pushing {
private final S[] injections;
+ private boolean pushedBased = false;
+ private boolean atMaster = true;
@SafeVarargs
public InjectStep(final Traversal.Admin traversal, final S... injections) {
@@ -47,4 +54,21 @@ public final class InjectStep<S> extends StartStep<S> {
super.reset();
this.start = new ArrayIterator<>(this.injections);
}
+
+ @Override
+ protected Traverser.Admin<S> processNextStart() {
+ if (this.first && !this.atMaster && this.pushedBased)
+ this.start = Collections.emptyIterator();
+ return super.processNextStart();
+ }
+
+ @Override
+ public void setAtMaster(final boolean atMaster) {
+ this.atMaster = atMaster;
+ }
+
+ @Override
+ public void setPushBased(final boolean pushBased) {
+ this.pushedBased = pushBased;
+ }
}