You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/24 22:22:15 UTC

tinkerpop git commit: made Barrier completion a little safer by requiring two rounds of terminate tokens. That is, all workers are told to complete their barrier. When all workers have completed their barriers, they then proceed to drain their barriers.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 0d5db8208 -> 7dfd7cfc9


made Barrier completion a little safer by requiring two rounds of terminate tokens. That is, all workers are told to complete their barrier. When all  workers have completed their barriers, they then proceed to drain their barriers. This made a flakey dedup()-test passing.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/7dfd7cfc
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/7dfd7cfc
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/7dfd7cfc

Branch: refs/heads/TINKERPOP-1564
Commit: 7dfd7cfc99a95311582c32989b58e63613acac83
Parents: 0d5db82
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 24 15:22:11 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 24 15:22:11 2017 -0700

----------------------------------------------------------------------
 .../akka/process/actors/AkkaActorsProvider.java |  1 -
 .../akka/process/actors/AkkaPlayTest.java       |  1 +
 .../traversal/TraversalMasterProgram.java       | 39 +++++++++++++-------
 .../traversal/step/filter/DedupGlobalStep.java  | 29 +++++++--------
 .../gremlin/process/actors/GraphActorsTest.java |  5 +--
 .../actors/TestSetupTerminateActorProgram.java  |  4 +-
 6 files changed, 44 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7dfd7cfc/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
index f610a97..c76db50 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
@@ -48,7 +48,6 @@ public class AkkaActorsProvider extends AbstractTinkerGraphProvider {
 
     private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList(
             "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack",
-            "g_V_repeatXdedupX_timesX2X_count",
             "g_withSackXmap__map_cloneX_V_out_out_sackXmap_a_nameX_sack",
             "g_VX1X_sideEffectXstore_aX_name",
             "g_VX1X_out_sideEffectXincr_cX_name",

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7dfd7cfc/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
index f1bce08..32b543c 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
@@ -57,6 +57,7 @@ public class AkkaPlayTest {
                 System.out.println(i + " -- " + map);
                 assert false;
             }
+            //assert 0L == g.V().repeat(dedup()).times(2).count().next();
         }
 
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7dfd7cfc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index 90d6edb..0581ba5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -66,6 +66,7 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
     private int orderCounter = -1;
     private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
     private boolean voteToHalt = true;
+    private boolean barriersDone = false;
 
     public TraversalMasterProgram(final Actor.Master<Pair<TraverserSet<R>, Map<String, Object>>> master, final Traversal.Admin<?, R> traversal) {
         this.traversal = traversal;
@@ -102,6 +103,7 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
             if (barrier instanceof SideEffectCapStep)
                 this.sideEffects.addAll(((SideEffectCapStep) barrier).getSideEffectKeys());
             this.barriers.put(((Step) barrier).getId(), barrier);
+            this.barriersDone = false;
         } else if (message instanceof SideEffectAddMessage) {
             final SideEffectAddMessage sideEffectAddMessage = (SideEffectAddMessage) message;
             this.traversal.getSideEffects().add(sideEffectAddMessage.getKey(), TraversalActorProgram.attach(sideEffectAddMessage.getValue(), this.master.partitioner().getGraph()));
@@ -118,23 +120,31 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
                 this.voteToHalt = false;
                 this.master.send(this.neighborAddress, Terminate.NO);
             } else if (this.voteToHalt && !this.barriers.isEmpty()) {
-                // process all barriers
-                for (final Barrier barrier : this.barriers.values()) {
-                    this.broadcast(new BarrierDoneMessage(barrier));
-                    final Step<?, ?> step = (Step) barrier;
-                    if (barrier instanceof LocalBarrier)  // the barriers are distributed amongst the workers
-                        barrier.done();
-                    else {                                // the barrier is at the master
-                        this.orderBarrier(step);
-                        if (step instanceof OrderGlobalStep) this.orderCounter = 0;
-                        while (step.hasNext()) {
-                            this.sendTraverser(-1 == this.orderCounter ?
-                                    step.next() :
-                                    new OrderedTraverser<>(step.next(), this.orderCounter++));
+                if (!this.barriersDone) {
+                    // tell all workers that the barrier is complete
+                    for (final Barrier barrier : this.barriers.values()) {
+                        this.broadcast(new BarrierDoneMessage(barrier));
+                    }
+                    this.barriersDone = true;
+                } else {
+                    // process all barriers
+                    for (final Barrier barrier : this.barriers.values()) {
+                        final Step<?, ?> step = (Step) barrier;
+                        if (barrier instanceof LocalBarrier)  // the barriers are distributed amongst the workers
+                            barrier.done();
+                        else {                                // the barrier is at the master
+                            this.orderBarrier(step);
+                            if (step instanceof OrderGlobalStep) this.orderCounter = 0;
+                            while (step.hasNext()) {
+                                this.sendTraverser(-1 == this.orderCounter ?
+                                        step.next() :
+                                        new OrderedTraverser<>(step.next(), this.orderCounter++));
+                            }
                         }
                     }
+                    this.barriers.clear();
+                    this.barriersDone = false;
                 }
-                this.barriers.clear();
                 this.voteToHalt = false;
                 this.master.send(this.neighborAddress, Terminate.NO);
             } else if (!this.voteToHalt) {
@@ -185,6 +195,7 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
             step.addStart(traverser);
             if (step instanceof Barrier) {
                 this.barriers.put(step.getId(), (Barrier) step);
+                this.barriersDone = false;
             } else {
                 while (step.hasNext()) {
                     this.processTraverser(step.next());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7dfd7cfc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index 8446540..c6c4a80 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -32,7 +32,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Scoping;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
@@ -88,16 +87,17 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     protected Traverser.Admin<S> processNextStart() {
-        if (null != this.barrier) {
-            this.barrierIterator = this.barrier.entrySet().iterator();
-            this.barrier = null;
-        }
-        while (this.barrierIterator != null && this.barrierIterator.hasNext()) {
-            if (null == this.barrierIterator)
-                this.barrierIterator = this.barrier.entrySet().iterator();
-            final Map.Entry<Object, Traverser.Admin<S>> entry = this.barrierIterator.next();
-            if (this.duplicateSet.add(entry.getKey()))
-                return PathProcessor.processTraverserPathLabels(entry.getValue(), this.keepLabels);
+        while (null != this.barrier || null != this.barrierIterator) {
+            if (null == this.barrierIterator) {
+                this.barrierIterator = null == this.barrier ? Collections.emptyIterator() : this.barrier.entrySet().iterator();
+                this.barrier = null;
+            }
+            while (this.barrierIterator.hasNext()) {
+                final Map.Entry<Object, Traverser.Admin<S>> entry = this.barrierIterator.next();
+                if (this.duplicateSet.add(entry.getKey()))
+                    return PathProcessor.processTraverserPathLabels(entry.getValue(), this.keepLabels);
+            }
+            this.barrierIterator = null;
         }
         return PathProcessor.processTraverserPathLabels(super.processNextStart(), this.keepLabels);
     }
@@ -186,7 +186,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
             } else {
                 object = TraversalUtil.applyNullable(traverser, this.dedupTraversal);
             }
-            if (!map.containsKey(object)) {
+            if (this.duplicateSet.add(object) && !map.containsKey(object)) {
                 traverser.setBulk(1L);
                 // traverser.detach();
                 traverser.set(DetachedFactory.detach(traverser.get(), true)); // TODO: detect required detachment accordingly
@@ -195,10 +195,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
         }
         this.barrier = null;
         this.barrierIterator = null;
-        if (map.isEmpty())
-            throw FastNoSuchElementException.instance();
-        else
-            return map;
+        return map;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7dfd7cfc/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
index 6018e64..80c9929 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
@@ -75,10 +75,9 @@ public class GraphActorsTest extends AbstractGremlinProcessTest {
 
     @Test
     public void shouldSetupAndTerminateProperly() throws Exception {
-
         for (int i = 1; i < 10; i++) {
-            final GraphActors actors = graphProvider.getGraphActors(graph);
-            final List<Integer> counts = (List)((ActorsResult) actors.workers(i).program(new TestSetupTerminateActorProgram()).submit(graph).get()).getResult();
+            final GraphActors<List<Integer>> actors = graphProvider.getGraphActors(graph);
+            final List<Integer> counts = actors.workers(i).program(new TestSetupTerminateActorProgram()).submit(graph).get().getResult();
             assertEquals(i, counts.get(0).intValue());
             assertEquals(i, counts.get(1).intValue());
             assertEquals(1, counts.get(2).intValue());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7dfd7cfc/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
index 59795ab..58d2d50 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
@@ -20,6 +20,7 @@
 package org.apache.tinkerpop.gremlin.process.actors;
 
 import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -29,7 +30,7 @@ import static org.junit.Assert.fail;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-class TestSetupTerminateActorProgram implements ActorProgram {
+class TestSetupTerminateActorProgram implements ActorProgram<List<Integer>> {
 
     private static final String WORKER_SETUP = "workerSetup";
     private static final String WORKER_TERMINATE = "workerTerminate";
@@ -105,6 +106,7 @@ class TestSetupTerminateActorProgram implements ActorProgram {
             @Override
             public void terminate() {
                 assertEquals(this.workerSetup, this.workerTerminate);
+                assertEquals(this.workerSetup, master.workers().size());
                 assertEquals(1, this.masterSetup);
                 assertEquals(0, this.masterTerminate);
                 master.setResult(Arrays.asList(