You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/19 21:59:02 UTC

tinkerpop git commit: got LocalBarriers with SideEffects working in GraphActors. There are only 3 tests that fail -- having to do with nested group().groupCount()-style things. Almost done.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 a2dbe7b0b -> 6b6df35d0


got LocalBarriers with SideEffects working in GraphActors. There are only 3 tests that fail -- having to do with nested group().groupCount()-style things. Almost done.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/6b6df35d
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/6b6df35d
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/6b6df35d

Branch: refs/heads/TINKERPOP-1564
Commit: 6b6df35d053aad1807ae9d72454c3cf3c4e96e9a
Parents: a2dbe7b
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 19 14:58:59 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 19 14:58:59 2017 -0700

----------------------------------------------------------------------
 .../akka/process/actors/AkkaActorsProvider.java |  7 +---
 .../traversal/TraversalMasterProgram.java       | 35 ++++++++++++--------
 .../traversal/TraversalWorkerProgram.java       |  8 ++++-
 .../traversal/WorkerTraversalSideEffects.java   |  1 +
 .../traversal/message/BarrierAddMessage.java    |  3 +-
 5 files changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
index a9b5820..7bc88fb 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
@@ -29,15 +29,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ComplexTest;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderTest;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTest;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONResourceAccess;
@@ -68,12 +64,11 @@ public class AkkaActorsProvider extends AbstractGraphProvider {
 
     private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList(
             "g_V_hasLabelXpersonX_V_hasLabelXsoftwareX_name",
-            "g_VX1X_repeatXbothEXcreatedX_whereXwithoutXeXX_aggregateXeX_otherVX_emit_path",
             "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack",
             "g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX",
             "g_V_repeatXdedupX_timesX2X_count",
             "g_withSackXmap__map_cloneX_V_out_out_sackXmap_a_nameX_sack",
-            //"g_V_out_group_byXlabelX_selectXpersonX_unfold_outXcreatedX_name_limitX2X",
+            "g_V_out_group_byXlabelX_selectXpersonX_unfold_outXcreatedX_name_limitX2X",
             "g_V_group_byXlabelX_byXbothE_groupXaX_byXlabelX_byXweight_sumX_weight_sumX",
             GraphTest.Traversals.class.getCanonicalName(),
             ComplexTest.Traversals.class.getCanonicalName(),

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index a6bb94e..e28919b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -46,7 +46,9 @@ import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -57,6 +59,7 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
     private final Traversal.Admin<?, ?> traversal;
     private final TraversalMatrix<?, ?> matrix;
     private Map<String, Barrier> barriers = new HashMap<>();
+    private Set<String> sideEffects = new HashSet<>();
     private final TraverserSet<?> results;
     private Address.Worker leaderWorker;
     private int orderCounter = -1;
@@ -89,17 +92,29 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
             this.processTraverser((Traverser.Admin) message);
         } else if (message instanceof BarrierAddMessage) {
             final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId());
-            final Step<?, ?> step = (Step) barrier;
-            barrier.addBarrier(TraversalActorProgram.attach(((BarrierAddMessage) message).getBarrier(), this.master.partitioner().getGraph()));
-            this.barriers.put(step.getId(), barrier);
+            if (!(barrier instanceof LocalBarrier))
+                barrier.addBarrier(TraversalActorProgram.attach(((BarrierAddMessage) message).getBarrier(), this.master.partitioner().getGraph()));
+            if (barrier instanceof SideEffectCapable)
+                this.sideEffects.add(((SideEffectCapable) barrier).getSideEffectKey());
+            this.barriers.put(((Step) barrier).getId(), barrier);
         } else if (message instanceof SideEffectAddMessage) {
-            this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue());
+            final SideEffectAddMessage sideEffectAddMessage = (SideEffectAddMessage) message;
+            this.traversal.getSideEffects().add(sideEffectAddMessage.getKey(), sideEffectAddMessage.getValue());
+            this.sideEffects.add(sideEffectAddMessage.getKey());
         } else if (message instanceof Terminate) {
             assert Terminate.YES == message;
-            if (!this.barriers.isEmpty()) {
+            if (!this.barriers.isEmpty() || !this.sideEffects.isEmpty()) {
+                // process all side-effect updates
+                for (final String key : this.sideEffects) {
+                    this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
+                }
+                // process all barriers
                 for (final Barrier barrier : this.barriers.values()) {
                     final Step<?, ?> step = (Step) barrier;
-                    if (!(barrier instanceof LocalBarrier)) {
+                    if (barrier instanceof LocalBarrier) { // the barriers are distributed amongst the workers
+                        this.broadcast(new BarrierDoneMessage(barrier));
+                        barrier.done();
+                    } else {                               // the barrier is at the master
                         this.orderBarrier(step);
                         if (step instanceof OrderGlobalStep) this.orderCounter = 0;
                         while (step.hasNext()) {
@@ -107,15 +122,9 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
                                     step.next() :
                                     new OrderedTraverser<>(step.next(), this.orderCounter++));
                         }
-                    } else {
-                        if (step instanceof SideEffectCapable) {
-                            final String key = ((SideEffectCapable) step).getSideEffectKey();
-                            this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
-                        }
-                        this.broadcast(new BarrierDoneMessage(barrier));
-                        barrier.done();
                     }
                 }
+                this.sideEffects.clear();
                 this.barriers.clear();
                 this.master.send(this.leaderWorker, Terminate.MAYBE);
             } else {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index b4df2b6..05735a4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -118,8 +119,13 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
             this.terminate = (Terminate) message;
             if (!this.barriers.isEmpty()) {
                 for (final Barrier barrier : this.barriers.values()) {
-                    while (barrier.hasNextBarrier()) {
+                    if (barrier instanceof LocalBarrier) {
+                        barrier.processAllStarts();
                         this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+                    } else {
+                        while (barrier.hasNextBarrier()) {
+                            this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+                        }
                     }
                 }
                 this.barriers.clear();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
index 7337316..6dcde9d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
@@ -43,6 +43,7 @@ public final class WorkerTraversalSideEffects implements TraversalSideEffects {
     }
 
     public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) {
+        assert !(sideEffects instanceof WorkerTraversalSideEffects);
         this.sideEffects = sideEffects;
         this.worker = worker;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
index b17e83c..4da7aac 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
 
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -35,7 +36,7 @@ public final class BarrierAddMessage {
     }
 
     public BarrierAddMessage(final Barrier barrier) {
-        this.barrier = barrier.nextBarrier();
+        this.barrier = barrier instanceof LocalBarrier ? null : barrier.nextBarrier();
         this.stepId = ((Step) barrier).getId();
     }