You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/25 14:08:28 UTC

tinkerpop git commit: javadoc'ing and code reorganization.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 b81dbc3b7 -> 12cce2b82


javadoc'ing and code reorganization.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/12cce2b8
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/12cce2b8
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/12cce2b8

Branch: refs/heads/TINKERPOP-1564
Commit: 12cce2b8249a936ea0db8245dc8e044b2f3f7e59
Parents: b81dbc3
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 25 07:08:24 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 25 07:08:24 2017 -0700

----------------------------------------------------------------------
 .../tinkerpop/gremlin/process/actors/Actor.java | 18 +++-
 .../gremlin/process/actors/ActorProgram.java    | 35 ++++---
 .../traversal/TraversalMasterProgram.java       | 14 ++-
 .../traversal/TraversalWorkerProgram.java       | 96 ++++++++++----------
 .../traversal/step/filter/DedupGlobalStep.java  |  2 +-
 5 files changed, 102 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
index 0f75e20..aec8632 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
@@ -36,7 +36,7 @@ import java.util.List;
 public interface Actor {
 
     /**
-     * Get the {@link Address} of the actors.
+     * Get the {@link Address} of the actor.
      *
      * @return the actors's address
      */
@@ -81,14 +81,30 @@ public interface Actor {
          */
         public Partitioner partitioner();
 
+        /**
+         * The master actor is responsible for yielding the final result of the computation.
+         *
+         * @param result the final result of the computation
+         */
         public void setResult(final R result);
 
     }
 
     public interface Worker extends Actor {
 
+        /**
+         * Get the worker actor's address.
+         *
+         * @return the worker actor's address
+         */
+        @Override
         public Address.Worker address();
 
+        /**
+         * Get the address of the worker's master actor.
+         *
+         * @return the master actor's address
+         */
         public Address.Master master();
 
         /**

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
index 4063090..d1db2e1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
@@ -114,46 +114,59 @@ public interface ActorProgram<R> extends Cloneable {
     }
 
     /**
-     * The Worker program is executed by a worker process in the {@link GraphActors} system.
+     * The Master program is executed by the master process in the {@link GraphActors} system.
      * There are many workers and a single master.
-     * All workers execute the same program.
      *
-     * @param <M> The message type accepted by the worker
+     * @param <M> The message type accepted by the master
      */
-    public static interface Worker<M> {
-
+    public static interface Master<M> {
         /**
-         * This method is evaluated when the worker process is spawned.
+         * This method is evaluated when the master actor is spawned.
          */
         public void setup();
 
         /**
-         * This method is evaluated when the worker receives a new message.
+         * This method is evaluated when the master actor receives a new message.
          *
          * @param message the received message
          */
         public void execute(final M message);
 
         /**
-         * This method is evaluated when the worker process is destroyed.
+         * This method is evaluated when the master actor is destroyed.
          */
         public void terminate();
 
     }
 
+
     /**
-     * The Master program is executed by the master process in the {@link GraphActors} system.
+     * The Worker program is executed by a worker process in the {@link GraphActors} system.
      * There are many workers and a single master.
+     * All workers execute the same program.
      *
-     * @param <M> The message type accepted by the master
+     * @param <M> The message type accepted by the worker
      */
-    public static interface Master<M> {
+    public static interface Worker<M> {
+
+        /**
+         * This method is evaluated when the worker actor is spawned.
+         */
         public void setup();
 
+        /**
+         * This method is evaluated when the worker receives a new message.
+         *
+         * @param message the received message
+         */
         public void execute(final M message);
 
+        /**
+         * This method is evaluated when the worker actor is destroyed.
+         */
         public void terminate();
 
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index 0581ba5..7e5340d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
@@ -198,7 +199,7 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
                 this.barriersDone = false;
             } else {
                 while (step.hasNext()) {
-                    this.processTraverser(step.next());
+                    this.sendTraverser(step.next());
                 }
             }
         }
@@ -210,11 +211,18 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
             this.traverserResults.add(traverser);
             return;
         }
+        //////
         this.voteToHalt = false;
-        if (traverser.get() instanceof Element)
+        if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep) {
+            // mid-traversal V()/E() traversers need to be broadcasted across all workers/partitions
+            for (final Address.Worker worker : this.master.workers()) {
+                this.master.send(worker, traverser);
+            }
+        } else if (traverser.get() instanceof Element) {
             this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().find((Element) traverser.get())), this.detachTraverser(traverser));
-        else
+        } else {
             this.master.send(this.master.address(), this.detachTraverser(traverser));
+        }
     }
 
     private void orderBarrier(final Step step) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index 31d8a0f..2f86e58 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -48,7 +48,7 @@ import java.util.Map;
  */
 final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
 
-    private final Actor.Worker self;
+    private final Actor.Worker worker;
     private final TraversalMatrix<?, ?> matrix;
     private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
     //
@@ -56,10 +56,10 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
     private boolean voteToHalt = true;
     private Map<String, Barrier> barriers = new HashMap<>();
 
-    public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) {
-        this.self = self;
+    public TraversalWorkerProgram(final Actor.Worker worker, final Traversal.Admin<?, ?> traversal) {
+        this.worker = worker;
         // create a pass-through side-effects which sends SideEffectAddMessages to the master actor
-        final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
+        final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.worker);
         TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
         this.matrix = new TraversalMatrix<>(traversal);
         // configure distributing and pushing semantics for worker execution
@@ -69,12 +69,12 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
         TraversalHelper.getStepsOfAssignableClassRecursively(GraphStep.class, this.matrix.getTraversal()).forEach(graphStep -> {
             if (0 == graphStep.getIds().length)
                 graphStep.setIteratorSupplier(graphStep.returnsVertex() ?
-                        this.self.partition()::vertices :
-                        this.self.partition()::edges);
+                        this.worker.partition()::vertices :
+                        this.worker.partition()::edges);
             else {
                 graphStep.setIteratorSupplier(graphStep.returnsVertex() ?
-                        () -> IteratorUtils.filter(self.partition().vertices(graphStep.getIds()), this.self.partition()::contains) :
-                        () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains));
+                        () -> IteratorUtils.filter(worker.partition().vertices(graphStep.getIds()), this.worker.partition()::contains) :
+                        () -> IteratorUtils.filter(worker.partition().edges(graphStep.getIds()), this.worker.partition()::contains));
             }
         });
     }
@@ -82,10 +82,10 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
     @Override
     public void setup() {
         // create termination ring topology
-        final int i = this.self.workers().indexOf(this.self.address());
-        this.neighborAddress = i == this.self.workers().size() - 1 ? this.self.master() : this.self.workers().get(i + 1);
-        for (int j = 0; j < this.self.partition().partitioner().getPartitions().size(); j++) {
-            this.partitionToWorkerMap.put(this.self.partition().partitioner().getPartitions().get(j), this.self.workers().get(j));
+        final int i = this.worker.workers().indexOf(this.worker.address());
+        this.neighborAddress = i == this.worker.workers().size() - 1 ? this.worker.master() : this.worker.workers().get(i + 1);
+        for (int j = 0; j < this.worker.partition().partitioner().getPartitions().size(); j++) {
+            this.partitionToWorkerMap.put(this.worker.partition().partitioner().getPartitions().get(j), this.worker.workers().get(j));
         }
         // once loaded, start processing start step
         final Step<?, ?> step = this.matrix.getTraversal().getStartStep();
@@ -97,11 +97,35 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
     @Override
     public void execute(final Object message) {
         if (message instanceof Traverser.Admin) {
-            this.processTraverser((Traverser.Admin) message);
+            ////////// PROCESS TRAVERSER //////////
+            final Traverser.Admin traverser = (Traverser.Admin) message;
+            // only mid-traversal V()/E() traversers can be non-locally processed
+            assert !(traverser.get() instanceof Element) ||
+                    this.worker.partition().contains((Element) traverser.get()) ||
+                    this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep;
+            if (traverser.isHalted()) {
+                // send halted traversers to master
+                this.sendTraverser(traverser);
+            } else {
+                // locally process traverser
+                TraversalActorProgram.attach(traverser, this.worker.partition());
+                final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+                step.addStart(traverser);
+                if (step instanceof Barrier) {
+                    this.barriers.put(step.getId(), (Barrier) step);
+                } else {
+                    while (step.hasNext()) {
+                        this.sendTraverser(step.next());
+                    }
+                }
+            }
         } else if (message instanceof SideEffectSetMessage) {
-            this.matrix.getTraversal().getSideEffects().
-                    set(((SideEffectSetMessage) message).getKey(), TraversalActorProgram.attach(((SideEffectSetMessage) message).getValue(), this.self.partition()));
+            ////////// UPDATE LOCAL SIDE-EFFECTS //////////
+            this.matrix.getTraversal().getSideEffects().set(
+                    ((SideEffectSetMessage) message).getKey(),
+                    TraversalActorProgram.attach(((SideEffectSetMessage) message).getValue(), this.worker.partition()));
         } else if (message instanceof BarrierDoneMessage) {
+            ////////// FINALIZE BARRIER SYNCHRONIZATION //////////
             final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId());
             if (step instanceof LocalBarrier) { // the worker drains the local barrier
                 while (step.hasNext()) {
@@ -110,15 +134,16 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
             } else
                 ((Barrier) step).done();       // the master drains the global barrier
         } else if (message instanceof Terminate) {
+            ////////// DETERMINE TERMINATION CONDITION //////////
             final Terminate terminate = (Terminate) message;
             if (this.voteToHalt && !this.barriers.isEmpty()) {
                 for (final Barrier barrier : this.barriers.values()) {
                     if (barrier instanceof LocalBarrier) {
                         barrier.processAllStarts();
-                        this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+                        this.worker.send(this.worker.master(), new BarrierAddMessage(barrier));
                     } else {
                         while (barrier.hasNextBarrier()) {
-                            this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+                            this.worker.send(this.worker.master(), new BarrierAddMessage(barrier));
                         }
                     }
                 }
@@ -126,7 +151,7 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
                 this.voteToHalt = false;
             }
             // use termination token to determine termination condition
-            this.self.send(this.neighborAddress, this.voteToHalt ? terminate : Terminate.NO);
+            this.worker.send(this.neighborAddress, this.voteToHalt ? terminate : Terminate.NO);
             this.voteToHalt = true;
         } else {
             throw new IllegalArgumentException("The following message is unknown: " + message);
@@ -140,46 +165,23 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
 
     //////////////
 
-    private void processTraverser(final Traverser.Admin traverser) {
-        // only mid-traversal V()/E() traversers can be non-locally processed
-        assert !(traverser.get() instanceof Element) ||
-                this.self.partition().contains((Element) traverser.get()) ||
-                this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep;
-        if (traverser.isHalted()) {
-            // send halted traversers to master
-            this.sendTraverser(traverser);
-        } else {
-            // locally process traverser
-            TraversalActorProgram.attach(traverser, this.self.partition());
-            final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
-            step.addStart(traverser);
-            if (step instanceof Barrier) {
-                this.barriers.put(step.getId(), (Barrier) step);
-            } else {
-                while (step.hasNext()) {
-                    this.sendTraverser(step.next());
-                }
-            }
-        }
-    }
-
     private void sendTraverser(final Traverser.Admin traverser) {
         this.voteToHalt = false;
         this.detachTraverser(traverser);
         if (traverser.isHalted()) {
             // send halted traversers to master
-            this.self.send(this.self.master(), traverser);
+            this.worker.send(this.worker.master(), traverser);
         } else if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep) {
             // mid-traversal V()/E() traversers need to be broadcasted across all workers/partitions
-            for (final Address.Worker worker : this.self.workers()) {
-                this.self.send(worker, traverser);
+            for (final Address.Worker worker : this.worker.workers()) {
+                this.worker.send(worker, traverser);
             }
-        } else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get())) {
+        } else if (traverser.get() instanceof Element && !this.worker.partition().contains((Element) traverser.get())) {
             // if the traverser references a non-local element, send the traverser to the appropriate worker/partition
-            this.self.send(this.partitionToWorkerMap.get(this.self.partition().partitioner().find((Element) traverser.get())), traverser);
+            this.worker.send(this.partitionToWorkerMap.get(this.worker.partition().partitioner().find((Element) traverser.get())), traverser);
         } else {
             // if the traverser is local to the worker, send traverser to self
-            this.self.send(this.self.address(), traverser);
+            this.worker.send(this.worker.address(), traverser);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index 7724dae..22fdcf1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -69,7 +69,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     protected boolean filter(final Traverser.Admin<S> traverser) {
-        if (this.pushBased && this.atWorker) return false;
+        if (this.pushBased && this.atWorker) return false; // todo: study why this is needed
         traverser.setBulk(1);
         if (null == this.dedupLabels) {
             return this.duplicateSet.add(TraversalUtil.applyNullable(traverser, this.dedupTraversal));