You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/24 19:02:36 UTC

tinkerpop git commit: okay, its official official. Every ProcessTestSuite traversal works with GraphActors except for those with Mutating. This is huge. GraphActors is semantically sound and executes a distributed traversal via message passing.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 9bb808f0a -> 0d5db8208


okay, its official official. Every ProcessTestSuite traversal works with GraphActors except for those with Mutating. This is huge. GraphActors is semantically sound and executes a distributed traversal via message passing.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0d5db820
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0d5db820
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0d5db820

Branch: refs/heads/TINKERPOP-1564
Commit: 0d5db820877a1495e1c9bf7a2f12072ddc083b5f
Parents: 9bb808f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 24 12:02:33 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 24 12:02:33 2017 -0700

----------------------------------------------------------------------
 .../traversal/TraversalMasterProgram.java       | 20 +++---
 .../traversal/TraversalWorkerProgram.java       | 66 ++++++++++----------
 .../verification/ActorVerificationStrategy.java |  6 --
 .../traversal/step/sideEffect/InjectStep.java   | 26 +++++++-
 4 files changed, 65 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d5db820/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index eb12a25..90d6edb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -38,7 +38,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
@@ -85,13 +84,6 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
         for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) {
             this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i));
         }
-        // inject step processing should start at the master traversal
-        if (this.traversal.getStartStep() instanceof InjectStep) {
-            final Step<?, ?> step = this.traversal.getStartStep().getNextStep();
-            while (step.hasNext()) {
-                this.processTraverser(step.next());
-            }
-        }
         // first pass of a two pass termination detection
         this.voteToHalt = false;
         this.master.send(this.neighborAddress, Terminate.NO);
@@ -149,22 +141,22 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
                 this.voteToHalt = true;
                 this.master.send(this.neighborAddress, Terminate.YES);
             } else {
-                // get any dangling local results
+                // get any dangling local results (e.g. workers have no data but a reducing barrier is waiting for data)
                 while (this.traversal.hasNext()) {
                     final Traverser.Admin traverser = this.traversal.nextTraverser();
                     this.traverserResults.add(-1 == this.orderCounter ? traverser : new OrderedTraverser(traverser, this.orderCounter++));
                 }
+                // if there is an ordering, order the result set
                 if (this.orderCounter != -1)
                     this.traverserResults.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
-
-                TraversalActorProgram.attach(this.traverserResults, this.master.partitioner().getGraph());
                 // generate the final result to send back to the GraphActors program
                 final Map<String, Object> sideEffects = new HashMap<>();
                 for (final String key : this.traversal.getSideEffects().keys()) {
                     sideEffects.put(key, this.traversal.getSideEffects().get(key));
                 }
+                // set the result (traversers and side-effects) to return to user
                 this.master.setResult(Pair.with(this.traverserResults, sideEffects));
-                // close master
+                // close master (and cascade close all workers)
                 this.master.close();
             }
         } else {
@@ -184,10 +176,11 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
     }
 
     private void processTraverser(final Traverser.Admin traverser) {
-        TraversalActorProgram.attach(traverser, this.master.partitioner().getGraph());
         if (traverser.isHalted() || traverser.get() instanceof Element) {
             this.sendTraverser(traverser);
         } else {
+            // attach traverser for local processing at the master actor
+            TraversalActorProgram.attach(traverser, this.master.partitioner().getGraph());
             final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
             step.addStart(traverser);
             if (step instanceof Barrier) {
@@ -202,6 +195,7 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
 
     private void sendTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted()) {
+            TraversalActorProgram.attach(traverser, this.master.partitioner().getGraph());
             this.traverserResults.add(traverser);
             return;
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d5db820/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index 67d8b4a..31d8a0f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -34,7 +34,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
@@ -59,22 +58,14 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
 
     public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) {
         this.self = self;
+        // create a pass-through side-effects which sends SideEffectAddMessages to the master actor
         final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
         TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
         this.matrix = new TraversalMatrix<>(traversal);
+        // configure distributing and pushing semantics for worker execution
         Distributing.configure(traversal, false, true);
         Pushing.configure(traversal, true, false);
-    }
-
-    @Override
-    public void setup() {
-        // create termination ring topology
-        final int i = this.self.workers().indexOf(this.self.address());
-        this.neighborAddress = i == this.self.workers().size() - 1 ? this.self.master() : this.self.workers().get(i + 1);
-        for (int j = 0; j < this.self.partition().partitioner().getPartitions().size(); j++) {
-            this.partitionToWorkerMap.put(this.self.partition().partitioner().getPartitions().get(j), this.self.workers().get(j));
-        }
-        // configure all the GraphSteps to be partition-centric
+        // configure all the GraphSteps to be partition-centric (TODO: GraphStep should implement distributing and be smart to get the partition from the traversal)
         TraversalHelper.getStepsOfAssignableClassRecursively(GraphStep.class, this.matrix.getTraversal()).forEach(graphStep -> {
             if (0 == graphStep.getIds().length)
                 graphStep.setIteratorSupplier(graphStep.returnsVertex() ?
@@ -86,12 +77,20 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
                         () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains));
             }
         });
-        // once loaded, start processing start step (unless its an inject step)
+    }
+
+    @Override
+    public void setup() {
+        // create termination ring topology
+        final int i = this.self.workers().indexOf(this.self.address());
+        this.neighborAddress = i == this.self.workers().size() - 1 ? this.self.master() : this.self.workers().get(i + 1);
+        for (int j = 0; j < this.self.partition().partitioner().getPartitions().size(); j++) {
+            this.partitionToWorkerMap.put(this.self.partition().partitioner().getPartitions().get(j), this.self.workers().get(j));
+        }
+        // once loaded, start processing start step
         final Step<?, ?> step = this.matrix.getTraversal().getStartStep();
-        if (!(step instanceof InjectStep)) {
-            while (step.hasNext()) {
-                this.processTraverser(step.next());
-            }
+        while (step.hasNext()) {
+            this.sendTraverser(step.next());
         }
     }
 
@@ -142,12 +141,15 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
     //////////////
 
     private void processTraverser(final Traverser.Admin traverser) {
+        // only mid-traversal V()/E() traversers can be non-locally processed
         assert !(traverser.get() instanceof Element) ||
                 this.self.partition().contains((Element) traverser.get()) ||
-                this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep; // only mid-traversal V()/E() traversers can be non-locally processed
-        if (traverser.isHalted())
+                this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep;
+        if (traverser.isHalted()) {
+            // send halted traversers to master
             this.sendTraverser(traverser);
-        else {
+        } else {
+            // locally process traverser
             TraversalActorProgram.attach(traverser, this.self.partition());
             final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
             step.addStart(traverser);
@@ -164,26 +166,24 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
     private void sendTraverser(final Traverser.Admin traverser) {
         this.voteToHalt = false;
         this.detachTraverser(traverser);
-        if (traverser.isHalted())
+        if (traverser.isHalted()) {
+            // send halted traversers to master
             this.self.send(this.self.master(), traverser);
-        else if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep)
-            this.broadcast(traverser);
-        else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get()))
+        } else if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep) {
+            // mid-traversal V()/E() traversers need to be broadcasted across all workers/partitions
+            for (final Address.Worker worker : this.self.workers()) {
+                this.self.send(worker, traverser);
+            }
+        } else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get())) {
+            // if the traverser references a non-local element, send the traverser to the appropriate worker/partition
             this.self.send(this.partitionToWorkerMap.get(this.self.partition().partitioner().find((Element) traverser.get())), traverser);
-        else
+        } else {
+            // if the traverser is local to the worker, send traverser to self
             this.self.send(this.self.address(), traverser);
-
-
-    }
-
-    private void broadcast(final Object message) {
-        for (final Address.Worker worker : this.self.workers()) {
-            this.self.send(worker, message);
         }
     }
 
     private final Traverser.Admin detachTraverser(final Traverser.Admin traverser) {
         return TraversalActorProgram.DETACH ? traverser.detach() : traverser;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d5db820/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
index 82f0e0c..49aafb6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
@@ -21,11 +21,8 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verificat
 
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -39,9 +36,6 @@ public final class ActorVerificationStrategy extends AbstractTraversalStrategy<T
 
     @Override
     public void apply(final Traversal.Admin<?, ?> traversal) {
-        if (TraversalHelper.getStepsOfAssignableClass(InjectStep.class, TraversalHelper.getRootTraversal(traversal)).size() > 0)
-            if (!(TraversalHelper.getRootTraversal(traversal).getStartStep() instanceof InjectStep))
-                throw new VerificationException("Inject traversal currently not supported", traversal);
         ReadOnlyStrategy.instance().apply(traversal);
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d5db820/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
index 34b5eab..d2138fe 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
@@ -19,14 +19,21 @@
 package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
 
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
 import org.apache.tinkerpop.gremlin.util.iterator.ArrayIterator;
 
+import java.util.Collections;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class InjectStep<S> extends StartStep<S> {
+public final class InjectStep<S> extends StartStep<S> implements Distributing, Pushing {
 
     private final S[] injections;
+    private boolean pushedBased = false;
+    private boolean atMaster = true;
 
     @SafeVarargs
     public InjectStep(final Traversal.Admin traversal, final S... injections) {
@@ -47,4 +54,21 @@ public final class InjectStep<S> extends StartStep<S> {
         super.reset();
         this.start = new ArrayIterator<>(this.injections);
     }
+
+    @Override
+    protected Traverser.Admin<S> processNextStart() {
+        if (this.first && !this.atMaster && this.pushedBased)
+            this.start = Collections.emptyIterator();
+        return super.processNextStart();
+    }
+
+    @Override
+    public void setAtMaster(final boolean atMaster) {
+        this.atMaster = atMaster;
+    }
+
+    @Override
+    public void setPushBased(final boolean pushBased) {
+        this.pushedBased = pushBased;
+    }
 }