You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/23 21:24:41 UTC
[45/50] [abbrv] tinkerpop git commit: got LocalBarriers with
SideEffects working in GraphActors. There are only 3 tests that fail --
having to do with nested group().groupCount()-style things. Almost done.
got LocalBarriers with SideEffects working in GraphActors. There are only 3 tests that fail -- having to do with nested group().groupCount()-style things. Almost done.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/51f0898f
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/51f0898f
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/51f0898f
Branch: refs/heads/TINKERPOP-1564
Commit: 51f0898f5aff47b46e5d0646894ca7f2d85b1993
Parents: 15e312c
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 19 14:58:59 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jan 23 14:22:53 2017 -0700
----------------------------------------------------------------------
.../akka/process/actors/AkkaActorsProvider.java | 7 +---
.../traversal/TraversalMasterProgram.java | 35 ++++++++++++--------
.../traversal/TraversalWorkerProgram.java | 8 ++++-
.../traversal/WorkerTraversalSideEffects.java | 1 +
.../traversal/message/BarrierAddMessage.java | 3 +-
5 files changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/51f0898f/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
index a9b5820..7bc88fb 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
@@ -29,15 +29,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.step.ComplexTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTest;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONResourceAccess;
@@ -68,12 +64,11 @@ public class AkkaActorsProvider extends AbstractGraphProvider {
private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList(
"g_V_hasLabelXpersonX_V_hasLabelXsoftwareX_name",
- "g_VX1X_repeatXbothEXcreatedX_whereXwithoutXeXX_aggregateXeX_otherVX_emit_path",
"g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack",
"g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX",
"g_V_repeatXdedupX_timesX2X_count",
"g_withSackXmap__map_cloneX_V_out_out_sackXmap_a_nameX_sack",
- //"g_V_out_group_byXlabelX_selectXpersonX_unfold_outXcreatedX_name_limitX2X",
+ "g_V_out_group_byXlabelX_selectXpersonX_unfold_outXcreatedX_name_limitX2X",
"g_V_group_byXlabelX_byXbothE_groupXaX_byXlabelX_byXweight_sumX_weight_sumX",
GraphTest.Traversals.class.getCanonicalName(),
ComplexTest.Traversals.class.getCanonicalName(),
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/51f0898f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index a6bb94e..e28919b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -46,7 +46,9 @@ import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partition;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -57,6 +59,7 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
private final Traversal.Admin<?, ?> traversal;
private final TraversalMatrix<?, ?> matrix;
private Map<String, Barrier> barriers = new HashMap<>();
+ private Set<String> sideEffects = new HashSet<>();
private final TraverserSet<?> results;
private Address.Worker leaderWorker;
private int orderCounter = -1;
@@ -89,17 +92,29 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
this.processTraverser((Traverser.Admin) message);
} else if (message instanceof BarrierAddMessage) {
final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId());
- final Step<?, ?> step = (Step) barrier;
- barrier.addBarrier(TraversalActorProgram.attach(((BarrierAddMessage) message).getBarrier(), this.master.partitioner().getGraph()));
- this.barriers.put(step.getId(), barrier);
+ if (!(barrier instanceof LocalBarrier))
+ barrier.addBarrier(TraversalActorProgram.attach(((BarrierAddMessage) message).getBarrier(), this.master.partitioner().getGraph()));
+ if (barrier instanceof SideEffectCapable)
+ this.sideEffects.add(((SideEffectCapable) barrier).getSideEffectKey());
+ this.barriers.put(((Step) barrier).getId(), barrier);
} else if (message instanceof SideEffectAddMessage) {
- this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue());
+ final SideEffectAddMessage sideEffectAddMessage = (SideEffectAddMessage) message;
+ this.traversal.getSideEffects().add(sideEffectAddMessage.getKey(), sideEffectAddMessage.getValue());
+ this.sideEffects.add(sideEffectAddMessage.getKey());
} else if (message instanceof Terminate) {
assert Terminate.YES == message;
- if (!this.barriers.isEmpty()) {
+ if (!this.barriers.isEmpty() || !this.sideEffects.isEmpty()) {
+ // process all side-effect updates
+ for (final String key : this.sideEffects) {
+ this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
+ }
+ // process all barriers
for (final Barrier barrier : this.barriers.values()) {
final Step<?, ?> step = (Step) barrier;
- if (!(barrier instanceof LocalBarrier)) {
+ if (barrier instanceof LocalBarrier) { // the barriers are distributed amongst the workers
+ this.broadcast(new BarrierDoneMessage(barrier));
+ barrier.done();
+ } else { // the barrier is at the master
this.orderBarrier(step);
if (step instanceof OrderGlobalStep) this.orderCounter = 0;
while (step.hasNext()) {
@@ -107,15 +122,9 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
step.next() :
new OrderedTraverser<>(step.next(), this.orderCounter++));
}
- } else {
- if (step instanceof SideEffectCapable) {
- final String key = ((SideEffectCapable) step).getSideEffectKey();
- this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
- }
- this.broadcast(new BarrierDoneMessage(barrier));
- barrier.done();
}
}
+ this.sideEffects.clear();
this.barriers.clear();
this.master.send(this.leaderWorker, Terminate.MAYBE);
} else {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/51f0898f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index b4df2b6..05735a4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -118,8 +119,13 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
this.terminate = (Terminate) message;
if (!this.barriers.isEmpty()) {
for (final Barrier barrier : this.barriers.values()) {
- while (barrier.hasNextBarrier()) {
+ if (barrier instanceof LocalBarrier) {
+ barrier.processAllStarts();
this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+ } else {
+ while (barrier.hasNextBarrier()) {
+ this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+ }
}
}
this.barriers.clear();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/51f0898f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
index 7337316..6dcde9d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
@@ -43,6 +43,7 @@ public final class WorkerTraversalSideEffects implements TraversalSideEffects {
}
public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) {
+ assert !(sideEffects instanceof WorkerTraversalSideEffects);
this.sideEffects = sideEffects;
this.worker = worker;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/51f0898f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
index b17e83c..4da7aac 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -35,7 +36,7 @@ public final class BarrierAddMessage {
}
public BarrierAddMessage(final Barrier barrier) {
- this.barrier = barrier.nextBarrier();
+ this.barrier = barrier instanceof LocalBarrier ? null : barrier.nextBarrier();
this.stepId = ((Step) barrier).getId();
}