You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/19 21:06:28 UTC
[07/50] [abbrv] tinkerpop git commit: added Pushing and Distributing
which are currently the two constructs of GraphComputing. GraphComputing is
now deprecated with its corresponding methods calling the respective
Pushing/Distributing methods. Pushing me
added Pushing and Distributing which are currently the two constructs of GraphComputing. GraphComputing is now deprecated with its corresponding methods calling the respective Pushing/Distributing methods. Pushing means the step is using push-based semantics. Distributing means that the step must be aware of whether it is executing in a master or worker traversal. Came up with a much cleaner way to configure these that doesn't rely on using VerificationStrategies and step-by-step setting. In short, once on a master or worker machine, configure yourself accordingly. Duh.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/58b4fd4b
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/58b4fd4b
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/58b4fd4b
Branch: refs/heads/TINKERPOP-1564
Commit: 58b4fd4b3829c797141ff6d96c68d3895e192344
Parents: 7d12d21
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 14 08:16:08 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 19 13:01:41 2017 -0700
----------------------------------------------------------------------
.../gremlin/akka/process/AkkaPlayTest.java | 10 +++-
.../actor/traversal/TraversalMasterProgram.java | 8 +--
.../actor/traversal/TraversalWorkerProgram.java | 6 +-
.../verification/ActorVerificationStrategy.java | 8 ---
.../process/traversal/step/Bypassing.java | 3 +
.../process/traversal/step/Distributing.java | 58 ++++++++++++++++++++
.../process/traversal/step/GraphComputing.java | 5 ++
.../gremlin/process/traversal/step/Pushing.java | 56 +++++++++++++++++++
.../traversal/step/filter/DedupGlobalStep.java | 42 +++++++++-----
.../traversal/step/filter/RangeGlobalStep.java | 16 ++++--
.../traversal/step/filter/TailGlobalStep.java | 15 +++--
.../process/traversal/step/map/GraphStep.java | 22 +++++---
.../step/sideEffect/ProfileSideEffectStep.java | 18 ++++--
.../traversal/step/util/ComputerAwareStep.java | 20 +++++--
14 files changed, 232 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
index 3874d06..6338706 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
@@ -28,6 +28,8 @@ import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.junit.Test;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
/**
@@ -40,7 +42,13 @@ public class AkkaPlayTest {
final Graph graph = TinkerGraph.open();
graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
GraphTraversalSource g = graph.traversal().withStrategies(new ActorProgramStrategy(AkkaGraphActors.class, new HashPartitioner(graph.partitioner(), 3)));
- System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
+ // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
+
+ for (int i = 0; i < 10000; i++) {
+ if(12l != g.V().union(out(), in()).values("name").count().next())
+ System.out.println(i);
+ }
+
//3, 1.9, 1
/*for (int i = 0; i < 10000; i++) {
final Graph graph = TinkerGraph.open();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
index 723339d..7846d53 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -32,11 +32,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
@@ -70,6 +70,8 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
this.partitioner = partitioner;
this.results = results;
this.master = master;
+ Distributing.configure(this.traversal, true, true);
+ Pushing.configure(this.traversal, true, false);
}
@Override
@@ -86,7 +88,6 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
} else if (message instanceof BarrierAddMessage) {
final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId());
final Step<?, ?> step = (Step) barrier;
- GraphComputing.atMaster(step, true);
barrier.addBarrier(((BarrierAddMessage) message).getBarrier());
this.barriers.put(step.getId(), barrier);
} else if (message instanceof SideEffectAddMessage) {
@@ -144,7 +145,6 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
this.sendTraverser(traverser);
} else {
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
- GraphComputing.atMaster(step, true);
step.addStart(traverser);
if (step instanceof Barrier) {
this.barriers.put(step.getId(), (Barrier) step);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
index 08d2cff..dc03b7d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -32,7 +32,9 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
@@ -72,6 +74,9 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
this.matrix = new TraversalMatrix<>(traversal);
+ Distributing.configure(traversal, false, true);
+ Pushing.configure(traversal, true, false);
+ //////
final GraphStep graphStep = (GraphStep) traversal.getStartStep();
if (0 == graphStep.getIds().length)
((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges);
@@ -148,7 +153,6 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
if (step instanceof Bypassing) ((Bypassing) step).setBypass(true);
- GraphComputing.atMaster(step, false);
step.addStart(traverser);
if (step instanceof Barrier) {
this.barriers.put(step.getId(), (Barrier) step);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
index 30ea2c5..f6e93ef 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
@@ -47,14 +47,6 @@ public final class ActorVerificationStrategy extends AbstractTraversalStrategy<T
public void apply(final Traversal.Admin<?, ?> traversal) {
if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty())
throw new VerificationException("Inject traversal currently not supported", traversal);
-
-
- final boolean globalChild = TraversalHelper.isGlobalChild(traversal);
- for (final Step<?, ?> step : traversal.getSteps()) {
- // only global children are graph computing
- if (globalChild && step instanceof GraphComputing)
- ((GraphComputing) step).onGraphComputer();
- }
}
public static ActorVerificationStrategy instance() {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java
index 2e67ff7..a9f3a82 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Bypassing.java
@@ -23,8 +23,11 @@ package org.apache.tinkerpop.gremlin.process.traversal.step;
* This is useful in for steps that need to dynamically change their behavior on {@link org.apache.tinkerpop.gremlin.process.computer.GraphComputer}.
*
* @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @deprecated As of release 3.3.0, replaced by {@link Distributing}
*/
+@Deprecated
public interface Bypassing {
+ @Deprecated
public void setBypass(final boolean bypass);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Distributing.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Distributing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Distributing.java
new file mode 100644
index 0000000..42e1b44
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Distributing.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.traversal.step;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Distributing {
+
+ /**
+ * Some steps should behave different whether they are executing at the master traversal or distributed across the worker traversals.
+ *
+ * @param atMaster whether the step is currently executing at master
+ */
+ public void setAtMaster(final boolean atMaster);
+
+ /**
+ * This static method will walk recursively traversal and set all {@link Distributing#setAtMaster(boolean)} accordingly.
+ *
+ * @param traversal the traversal to recursively walk with the assumption that the provided traversal is a global traversal.
+ * @param globalMasterDistributing whether global traversals should be treated as being at a master or worker step.
+ * @param localMasterDistribution whether local traversals should be treated as being at a master or worker step.
+ */
+ public static void configure(final Traversal.Admin<?, ?> traversal, final boolean globalMasterDistributing, final boolean localMasterDistribution) {
+ for (final Step<?, ?> step : traversal.getSteps()) {
+ if (step instanceof Distributing)
+ ((Distributing) step).setAtMaster(globalMasterDistributing);
+ if (step instanceof TraversalParent) {
+ for (final Traversal.Admin<?, ?> global : ((TraversalParent) step).getGlobalChildren()) {
+ Distributing.configure(global, globalMasterDistributing, localMasterDistribution);
+ }
+ for (final Traversal.Admin<?, ?> local : ((TraversalParent) step).getLocalChildren()) {
+ Distributing.configure(local, localMasterDistribution, localMasterDistribution);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java
index cec5708..8e2b3b3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/GraphComputing.java
@@ -27,12 +27,15 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
* This method is only called for global children steps of a {@link TraversalParent}.
*
* @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @deprecated As of release 3.3.0, replaced by {@link Pushing} and {@link Distributing}
*/
+@Deprecated
public interface GraphComputing {
/**
* The step will be executing on a {@link org.apache.tinkerpop.gremlin.process.computer.GraphComputer}.
*/
+ @Deprecated
public void onGraphComputer();
/**
@@ -41,10 +44,12 @@ public interface GraphComputing {
*
* @param atMaster whether the step is currently executing at master
*/
+ @Deprecated
public default void atMaster(boolean atMaster) {
}
+ @Deprecated
public static void atMaster(final Step<?, ?> step, boolean atMaster) {
if (step instanceof GraphComputing)
((GraphComputing) step).atMaster(atMaster);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Pushing.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Pushing.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Pushing.java
new file mode 100644
index 0000000..8d8ca9d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/Pushing.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.traversal.step;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Pushing {
+
+ /**
+ * Set whether this step will be executing using a push-based model or the standard pull-based iterator model.
+ */
+ public void setPushBased(final boolean pushBased);
+
+ /**
+ * This static method will walk recursively traversal and set all {@link Pushing#setPushBased(boolean)} accordingly.
+ *
+ * @param traversal the traversal to recursively walk with the assumption that the provided traversal is a global traversal.
+ * @param globalPushBased the traverser propagation semantics (push or pull) of global children (typically push).
+ * @param localPushBased the traverser propagation semantics (push or pull) of local children (typically pull).
+ */
+ public static void configure(final Traversal.Admin<?, ?> traversal, final boolean globalPushBased, final boolean localPushBased) {
+ for (final Step<?, ?> step : traversal.getSteps()) {
+ if (step instanceof Pushing)
+ ((Pushing) step).setPushBased(globalPushBased);
+ if (step instanceof TraversalParent) {
+ for (final Traversal.Admin<?, ?> global : ((TraversalParent) step).getGlobalChildren()) {
+ Pushing.configure(global, globalPushBased, localPushBased);
+ }
+ for (final Traversal.Admin<?, ?> local : ((TraversalParent) step).getLocalChildren()) {
+ Pushing.configure(local, localPushBased, localPushBased);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index 96bd0be..8446540 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -25,8 +25,10 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.process.traversal.step.Scoping;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
@@ -50,16 +52,16 @@ import java.util.function.BinaryOperator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class DedupGlobalStep<S> extends FilterStep<S> implements TraversalParent, Scoping, GraphComputing, Barrier<Map<Object, Traverser.Admin<S>>>, ByModulating, PathProcessor {
+public final class DedupGlobalStep<S> extends FilterStep<S> implements TraversalParent, Scoping, Barrier<Map<Object, Traverser.Admin<S>>>, ByModulating, PathProcessor, Distributing, Pushing, GraphComputing {
private Traversal.Admin<S, Object> dedupTraversal = null;
private Set<Object> duplicateSet = new HashSet<>();
- private boolean onGraphComputer = false;
private final Set<String> dedupLabels;
private Set<String> keepLabels;
- private boolean executingAtMaster = false;
private Map<Object, Traverser.Admin<S>> barrier;
private Iterator<Map.Entry<Object, Traverser.Admin<S>>> barrierIterator;
+ private boolean atWorker = true;
+ private boolean pushBased = false;
public DedupGlobalStep(final Traversal.Admin traversal, final String... dedupLabels) {
super(traversal);
@@ -68,8 +70,8 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
@Override
protected boolean filter(final Traverser.Admin<S> traverser) {
- if (this.onGraphComputer && !this.executingAtMaster) return true;
- traverser.setBulk(1L);
+ if (this.pushBased && this.atWorker) return true;
+ traverser.setBulk(1);
if (null == this.dedupLabels) {
return this.duplicateSet.add(TraversalUtil.applyNullable(traverser, this.dedupTraversal));
} else {
@@ -80,11 +82,6 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
}
@Override
- public void atMaster(final boolean atMaster) {
- this.executingAtMaster = atMaster;
- }
-
- @Override
public ElementRequirement getMaxRequirement() {
return null == this.dedupLabels ? ElementRequirement.ID : PathProcessor.super.getMaxRequirement();
}
@@ -161,11 +158,6 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
}
@Override
- public void onGraphComputer() {
- this.onGraphComputer = true;
- }
-
- @Override
public Set<String> getScopeKeys() {
return null == this.dedupLabels ? Collections.emptySet() : this.dedupLabels;
}
@@ -231,4 +223,24 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
public Set<String> getKeepLabels() {
return this.keepLabels;
}
+
+ @Override
+ public void setAtMaster(final boolean atMaster) {
+ this.atWorker = !atMaster;
+ }
+
+ @Override
+ public void setPushBased(final boolean pushBased) {
+ this.pushBased = pushBased;
+ }
+
+ @Override
+ public void onGraphComputer() {
+ this.setPushBased(true);
+ }
+
+ @Override
+ public void atMaster(final boolean atMaster) {
+ this.setAtMaster(atMaster);
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java
index 9700870..c361321 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/RangeGlobalStep.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
import org.apache.tinkerpop.gremlin.process.traversal.step.Ranging;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
@@ -41,12 +42,12 @@ import java.util.function.BinaryOperator;
* @author Bob Briody (http://bobbriody.com)
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class RangeGlobalStep<S> extends FilterStep<S> implements Ranging, Bypassing, Barrier<TraverserSet<S>> {
+public final class RangeGlobalStep<S> extends FilterStep<S> implements Ranging, Bypassing, Barrier<TraverserSet<S>>, Distributing {
private long low;
private final long high;
private AtomicLong counter = new AtomicLong(0l);
- private boolean bypass;
+ private boolean atMaster = true;
public RangeGlobalStep(final Traversal.Admin traversal, final long low, final long high) {
super(traversal);
@@ -59,7 +60,7 @@ public final class RangeGlobalStep<S> extends FilterStep<S> implements Ranging,
@Override
protected boolean filter(final Traverser.Admin<S> traverser) {
- if (this.bypass) return true;
+ if (!this.atMaster) return true;
if (this.high != -1 && this.counter.get() >= this.high) {
throw FastNoSuchElementException.instance();
@@ -146,7 +147,7 @@ public final class RangeGlobalStep<S> extends FilterStep<S> implements Ranging,
@Override
public void setBypass(final boolean bypass) {
- this.bypass = bypass;
+ this.setAtMaster(!bypass);
}
@Override
@@ -161,7 +162,7 @@ public final class RangeGlobalStep<S> extends FilterStep<S> implements Ranging,
@Override
public TraverserSet<S> nextBarrier() throws NoSuchElementException {
- if(!this.starts.hasNext())
+ if (!this.starts.hasNext())
throw FastNoSuchElementException.instance();
final TraverserSet<S> barrier = new TraverserSet<>();
while (this.starts.hasNext()) {
@@ -178,6 +179,11 @@ public final class RangeGlobalStep<S> extends FilterStep<S> implements Ranging,
});
}
+ @Override
+ public void setAtMaster(final boolean atMaster) {
+ this.atMaster = atMaster;
+ }
+
////////////////
public static final class RangeBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java
index 2e31b1f..cc31e24 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/TailGlobalStep.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
@@ -39,12 +40,12 @@ import java.util.Set;
/**
* @author Matt Frantz (http://github.com/mhfrantz)
*/
-public final class TailGlobalStep<S> extends AbstractStep<S, S> implements Bypassing, Barrier<TraverserSet<S>> {
+public final class TailGlobalStep<S> extends AbstractStep<S, S> implements Bypassing, Distributing, Barrier<TraverserSet<S>> {
private final long limit;
private Deque<Traverser.Admin<S>> tail;
private long tailBulk = 0L;
- private boolean bypass = false;
+ private boolean atWorker = false;
public TailGlobalStep(final Traversal.Admin traversal, final long limit) {
super(traversal);
@@ -52,13 +53,14 @@ public final class TailGlobalStep<S> extends AbstractStep<S, S> implements Bypas
this.tail = new ArrayDeque<>((int) limit);
}
+ @Override
public void setBypass(final boolean bypass) {
- this.bypass = bypass;
+ this.setAtMaster(!bypass);
}
@Override
public Traverser.Admin<S> processNextStart() {
- if (this.bypass) {
+ if (this.atWorker) {
// If we are bypassing this step, let everything through.
return this.starts.next();
} else {
@@ -160,4 +162,9 @@ public final class TailGlobalStep<S> extends AbstractStep<S, S> implements Bypas
this.addStart(traverser);
});
}
+
+ @Override
+ public void setAtMaster(final boolean atMaster) {
+ this.atWorker = !atMaster;
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
index 87935d8..70f7925 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GraphStep.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
@@ -48,7 +49,7 @@ import java.util.function.Supplier;
* @author Marko A. Rodriguez (http://markorodriguez.com)
* @author Pieter Martin
*/
-public class GraphStep<S, E extends Element> extends AbstractStep<S, E> implements GraphComputing, AutoCloseable {
+public class GraphStep<S, E extends Element> extends AbstractStep<S, E> implements GraphComputing, Pushing, AutoCloseable {
protected final Class<E> returnClass;
protected Object[] ids;
@@ -112,12 +113,6 @@ public class GraphStep<S, E extends Element> extends AbstractStep<S, E> implemen
this.ids = new Object[0];
}
- @Override
- public void onGraphComputer() {
- this.iteratorSupplier = Collections::emptyIterator;
- convertElementsToIds();
- }
-
public void convertElementsToIds() {
for (int i = 0; i < this.ids.length; i++) { // if this is going to OLAP, convert to ids so you don't serialize elements
if (this.ids[i] instanceof Element)
@@ -189,4 +184,17 @@ public class GraphStep<S, E extends Element> extends AbstractStep<S, E> implemen
}
return false;
}
+
+ @Override
+ public void onGraphComputer() {
+ this.setPushBased(true);
+ }
+
+ @Override
+ public void setPushBased(final boolean pushBased) {
+ if (pushBased) {
+ this.iteratorSupplier = Collections::emptyIterator;
+ convertElementsToIds();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java
index be60808..f165738 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileSideEffectStep.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalMetrics;
import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -32,11 +33,11 @@ import java.util.function.Supplier;
/**
* @author Bob Briody (http://bobbriody.com)
*/
-public final class ProfileSideEffectStep<S> extends SideEffectStep<S> implements SideEffectCapable<DefaultTraversalMetrics, DefaultTraversalMetrics>, GraphComputing {
+public final class ProfileSideEffectStep<S> extends SideEffectStep<S> implements SideEffectCapable<DefaultTraversalMetrics, DefaultTraversalMetrics>, GraphComputing, Pushing {
public static final String DEFAULT_METRICS_KEY = Graph.Hidden.hide("metrics");
private String sideEffectKey;
- private boolean onGraphComputer = false;
+ private boolean pushBased = false;
public ProfileSideEffectStep(final Traversal.Admin traversal, final String sideEffectKey) {
super(traversal);
@@ -60,7 +61,7 @@ public final class ProfileSideEffectStep<S> extends SideEffectStep<S> implements
start = super.next();
return start;
} finally {
- if (!this.onGraphComputer && start == null) {
+ if (!this.pushBased && start == null) {
((DefaultTraversalMetrics) this.getTraversal().getSideEffects().get(this.sideEffectKey)).setMetrics(this.getTraversal(), false);
}
}
@@ -69,7 +70,7 @@ public final class ProfileSideEffectStep<S> extends SideEffectStep<S> implements
@Override
public boolean hasNext() {
boolean start = super.hasNext();
- if (!this.onGraphComputer && !start) {
+ if (!this.pushBased && !start) {
((DefaultTraversalMetrics) this.getTraversal().getSideEffects().get(this.sideEffectKey)).setMetrics(this.getTraversal(), false);
}
return start;
@@ -77,13 +78,18 @@ public final class ProfileSideEffectStep<S> extends SideEffectStep<S> implements
@Override
public DefaultTraversalMetrics generateFinalResult(final DefaultTraversalMetrics tm) {
- if (this.onGraphComputer)
+ if (this.pushBased)
tm.setMetrics(this.getTraversal(), true);
return tm;
}
@Override
public void onGraphComputer() {
- onGraphComputer = true;
+ this.setPushBased(true);
+ }
+
+ @Override
+ public void setPushBased(boolean pushBased) {
+ this.pushBased = pushBased;
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/58b4fd4b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
index 5acff58..4adbfa2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ComputerAwareStep.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.util;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
@@ -30,7 +31,7 @@ import java.util.NoSuchElementException;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public abstract class ComputerAwareStep<S, E> extends AbstractStep<S, E> implements GraphComputing {
+public abstract class ComputerAwareStep<S, E> extends AbstractStep<S, E> implements GraphComputing, Pushing {
private Iterator<Traverser.Admin<E>> previousIterator = EmptyIterator.instance();
@@ -49,7 +50,12 @@ public abstract class ComputerAwareStep<S, E> extends AbstractStep<S, E> impleme
@Override
public void onGraphComputer() {
- this.traverserStepIdAndLabelsSetByChild = true;
+ this.setPushBased(true);
+ }
+
+ @Override
+ public void setPushBased(final boolean pushBased) {
+ this.traverserStepIdAndLabelsSetByChild = pushBased;
}
@Override
@@ -63,9 +69,10 @@ public abstract class ComputerAwareStep<S, E> extends AbstractStep<S, E> impleme
protected abstract Iterator<Traverser.Admin<E>> computerAlgorithm() throws NoSuchElementException;
+
//////
- public static class EndStep<S> extends AbstractStep<S, S> implements GraphComputing {
+ public static class EndStep<S> extends AbstractStep<S, S> implements GraphComputing, Pushing {
public EndStep(final Traversal.Admin traversal) {
super(traversal);
@@ -88,7 +95,12 @@ public abstract class ComputerAwareStep<S, E> extends AbstractStep<S, E> impleme
@Override
public void onGraphComputer() {
- this.traverserStepIdAndLabelsSetByChild = true;
+ this.setPushBased(true);
+ }
+
+ @Override
+ public void setPushBased(final boolean pushBased) {
+ this.traverserStepIdAndLabelsSetByChild = pushBased;
}
}