You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/27 21:25:13 UTC
[11/12] tinkerpop git commit: realize that we can generalized
SingleIterationStrategy to 1) work with GraphActors and 2) work at every
walk-step. Thus,
given that GraphActors isn't about Iterations and we can make this work for
multi-stages, I renamed th
realize that we can generalized SingleIterationStrategy to 1) work with GraphActors and 2) work at every walk-step. Thus, given that GraphActors isn't about Iterations and we can make this work for multi-stages, I renamed the strategy MessagePassingReductionStrategy. This way we won't have to name change in the future. Also, I did some more code cleanups and added a bunch more test cases. This is perhaps the most tested strategy.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/c2a42e27
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/c2a42e27
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/c2a42e27
Branch: refs/heads/master
Commit: c2a42e27f901b95c66e31562940a176b9f932692
Parents: 1819e05
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Jan 27 11:30:15 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 27 14:24:35 2017 -0700
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
.../MessagePassingReductionStrategy.java | 162 +++++++++++++++++++
.../optimization/SingleIterationStrategy.java | 151 -----------------
.../process/traversal/TraversalStrategies.java | 4 +-
.../MessagePassingReductionStrategyTest.java | 135 ++++++++++++++++
.../SingleIterationStrategyTest.java | 128 ---------------
.../SparkSingleIterationStrategy.java | 42 +----
.../SparkSingleIterationStrategyTest.java | 17 +-
8 files changed, 314 insertions(+), 326 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 860d401..d523ae2 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -28,6 +28,7 @@ TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
* Fixed a bug where `PathProcessor.keepLabels` were not being pushed down into child traversals by `PathRetractionStrategy`.
* Added `SingleIterationStrategy` as a default `GraphComputer` strategy that can re-write some traversals to not require message passing.
+* Added default `MessagePassingReductionStrategy` for `GraphComputer` that can reduce the number of message passing iterations.
* Fixed a bug associated with user-provided maps and `GroupSideEffectStep`.
* `GroupBiOperator` no longer maintains a detached traversal and thus, no more side-effect related OLAP inconsistencies.
* Added `ProjectedTraverser` which wraps a traverser with a `List<Object>` of projected data.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java
new file mode 100644
index 0000000..cff152e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.IdStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.IdentityStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MessagePassingReductionStrategy extends AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements TraversalStrategy.OptimizationStrategy {
+
+ private static final MessagePassingReductionStrategy INSTANCE = new MessagePassingReductionStrategy();
+
+ private static final Set<Class<? extends OptimizationStrategy>> PRIORS = new HashSet<>(Arrays.asList(
+ IncidentToAdjacentStrategy.class,
+ AdjacentToIncidentStrategy.class,
+ FilterRankingStrategy.class,
+ InlineFilterStrategy.class));
+
+ private MessagePassingReductionStrategy() {
+ }
+
+ @Override
+ public void apply(final Traversal.Admin<?, ?> traversal) {
+ // only process the first traversal step in an OLAP chain
+ TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal).ifPresent(step -> {
+ final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined
+ final Traversal.Admin<?, ?> compiledComputerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone();
+ if (!compiledComputerTraversal.isLocked())
+ compiledComputerTraversal.applyStrategies();
+ if (!TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(LocalStep.class, LambdaHolder.class), compiledComputerTraversal) && // don't do anything with lambdas or locals as this leads to unknown adjacencies
+ !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH) && // when dynamic detachment is provided in 3.3.0, remove this (TODO)
+ !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) && // when dynamic detachment is provided in 3.3.0, remove this (TODO)
+ !(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, compiledComputerTraversal). // this is a strict precaution that could be loosed with deeper logic on barriers in global children
+ stream().
+ filter(parent -> !parent.getGlobalChildren().isEmpty()).findAny().isPresent())) {
+ final Traversal.Admin<?, ?> computerTraversal = step.computerTraversal.get().clone();
+ // apply the strategies up to this point
+ final List<TraversalStrategy<?>> strategies = step.getTraversal().getStrategies().toList();
+ final int indexOfStrategy = strategies.indexOf(MessagePassingReductionStrategy.instance());
+ if (indexOfStrategy > 0)
+ TraversalHelper.applySingleLevelStrategies(step.getTraversal(), computerTraversal, strategies.get(indexOfStrategy - 1).getClass());
+ if (computerTraversal.getSteps().size() > 1 &&
+ !(computerTraversal.getStartStep().getNextStep() instanceof Barrier) &&
+ TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(VertexStep.class, EdgeVertexStep.class), computerTraversal) &&
+ TraversalHelper.isLocalStarGraph(computerTraversal)) {
+ final Step barrier = (Step) TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, computerTraversal).orElse(null);
+ if (MessagePassingReductionStrategy.insertElementId(barrier)) // out().count() -> out().id().count()
+ TraversalHelper.insertBeforeStep(new IdStep(computerTraversal), barrier, computerTraversal);
+ if (!(MessagePassingReductionStrategy.endsWithElement(null == barrier ? computerTraversal.getEndStep() : barrier))) {
+ Traversal.Admin newChildTraversal = new DefaultGraphTraversal<>();
+ TraversalHelper.removeToTraversal(computerTraversal.getStartStep() instanceof GraphStep ?
+ computerTraversal.getStartStep().getNextStep() :
+ (Step) computerTraversal.getStartStep(), null == barrier ?
+ EmptyStep.instance() :
+ barrier, newChildTraversal);
+ newChildTraversal = newChildTraversal.getSteps().size() > 1 ? (Traversal.Admin) __.local(newChildTraversal) : newChildTraversal;
+ if (null == barrier)
+ TraversalHelper.insertTraversal(0, newChildTraversal, computerTraversal);
+ else
+ TraversalHelper.insertTraversal(barrier.getPreviousStep(), newChildTraversal, computerTraversal);
+ }
+ }
+ step.setComputerTraversal(computerTraversal);
+ }
+ });
+ }
+
+ private static boolean insertElementId(final Step<?, ?> barrier) {
+ if (!(barrier instanceof Barrier))
+ return false;
+ else if (!endsWithElement(barrier.getPreviousStep()))
+ return false;
+ else if (barrier instanceof CountGlobalStep)
+ return true;
+ else if (barrier instanceof DedupGlobalStep &&
+ ((DedupGlobalStep) barrier).getScopeKeys().isEmpty() &&
+ ((DedupGlobalStep) barrier).getLocalChildren().isEmpty() &&
+ barrier.getNextStep() instanceof CountGlobalStep)
+ return true;
+ else
+ return false;
+ }
+
+ public static final boolean endsWithElement(Step<?, ?> currentStep) {
+ while (!(currentStep instanceof EmptyStep)) {
+ if (currentStep instanceof VertexStep) // only inE, in, and out send messages
+ return (((VertexStep) currentStep).returnsVertex() || !((VertexStep) currentStep).getDirection().equals(Direction.OUT));
+ else if (currentStep instanceof EdgeVertexStep) // TODO: add GraphStep but only if its mid-traversal V()/E()
+ return true;
+ else if (currentStep instanceof TraversalFlatMapStep || currentStep instanceof TraversalMapStep || currentStep instanceof LocalStep)
+ return endsWithElement(((TraversalParent) currentStep).getLocalChildren().get(0).getEndStep());
+ else if (!(currentStep instanceof FilterStep || currentStep instanceof SideEffectStep || currentStep instanceof IdentityStep || currentStep instanceof Barrier))
+ return false;
+ currentStep = currentStep.getPreviousStep();
+ }
+ return false;
+ }
+
+ @Override
+ public Set<Class<? extends OptimizationStrategy>> applyPrior() {
+ return PRIORS;
+ }
+
+ public static MessagePassingReductionStrategy instance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
deleted file mode 100644
index 6b509ef..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
-
-import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
-import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.IdStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SingleIterationStrategy extends AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements TraversalStrategy.OptimizationStrategy {
-
- private static final SingleIterationStrategy INSTANCE = new SingleIterationStrategy();
-
- private static final Set<Class<? extends OptimizationStrategy>> PRIORS = new HashSet<>(Arrays.asList(
- IncidentToAdjacentStrategy.class,
- AdjacentToIncidentStrategy.class,
- FilterRankingStrategy.class,
- InlineFilterStrategy.class));
-
- private SingleIterationStrategy() {
- }
-
- @Override
- public void apply(final Traversal.Admin<?, ?> traversal) {
- // only process the first traversal step in an OLAP chain
- TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal).ifPresent(step -> {
- final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined
- final Traversal.Admin<?, ?> compiledComputerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone();
- if (!compiledComputerTraversal.isLocked())
- compiledComputerTraversal.applyStrategies();
- if (!TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(LocalStep.class, LambdaHolder.class), compiledComputerTraversal) &&
- !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH) && // when dynamic detachment is provided in 3.3.0, remove this (TODO)
- !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) && // when dynamic detachment is provided in 3.3.0, remove this (TODO)
- !(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, compiledComputerTraversal). // this is a strict precaution that could be loosed with deeper logic on barriers in global children
- stream().
- filter(parent -> !parent.getGlobalChildren().isEmpty()).findAny().isPresent())) {
- final Traversal.Admin<?, ?> computerTraversal = step.computerTraversal.getPure();
- if (computerTraversal.getSteps().size() > 1 &&
- !(computerTraversal.getStartStep().getNextStep() instanceof Barrier) &&
- TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(VertexStep.class, EdgeVertexStep.class), computerTraversal) &&
- TraversalHelper.isLocalStarGraph(computerTraversal)) {
- final Traversal.Admin newChildTraversal = new DefaultGraphTraversal<>();
- final Step barrier = (Step) TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, computerTraversal).orElse(null);
- if (SingleIterationStrategy.insertElementId(barrier)) // out().count() -> out().id().count()
- TraversalHelper.insertBeforeStep(new IdStep(computerTraversal), barrier, computerTraversal);
- if (!(SingleIterationStrategy.endsWithElement(null == barrier ? computerTraversal.getEndStep() : barrier.getPreviousStep()))) {
- TraversalHelper.removeToTraversal(computerTraversal.getStartStep() instanceof GraphStep ?
- computerTraversal.getStartStep().getNextStep() :
- (Step) computerTraversal.getStartStep(), null == barrier ?
- EmptyStep.instance() :
- barrier, newChildTraversal);
- if (null == barrier)
- TraversalHelper.insertTraversal(0, (Traversal.Admin) __.local(newChildTraversal), computerTraversal);
- else
- TraversalHelper.insertTraversal(barrier.getPreviousStep(), (Traversal.Admin) __.local(newChildTraversal), computerTraversal);
- }
- }
- step.setComputerTraversal(computerTraversal);
- }
- });
- }
-
- private static boolean insertElementId(final Step<?, ?> barrier) {
- if (!(barrier instanceof Barrier))
- return false;
- else if (!endsWithElement(barrier.getPreviousStep()))
- return false;
- else if (barrier instanceof CountGlobalStep)
- return true;
- else if (barrier instanceof DedupGlobalStep &&
- ((DedupGlobalStep) barrier).getScopeKeys().isEmpty() &&
- ((DedupGlobalStep) barrier).getLocalChildren().isEmpty() &&
- barrier.getNextStep() instanceof CountGlobalStep)
- return true;
- else
- return false;
- }
-
- private static boolean endsWithElement(Step<?, ?> currentStep) {
- while (!(currentStep instanceof EmptyStep)) {
- if (currentStep instanceof VertexStep || currentStep instanceof EdgeVertexStep) // TODO: add GraphStep but only if its mid-traversal V()/E()
- return true;
- else if (currentStep instanceof TraversalFlatMapStep || currentStep instanceof TraversalMapStep)
- return endsWithElement(((TraversalParent) currentStep).getLocalChildren().get(0).getEndStep());
- else if (!(currentStep instanceof FilterStep || currentStep instanceof SideEffectStep))
- return false;
- currentStep = currentStep.getPreviousStep();
- }
- return false;
- }
-
- @Override
- public Set<Class<? extends OptimizationStrategy>> applyPrior() {
- return PRIORS;
- }
-
- public static SingleIterationStrategy instance() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
index a1cb99c..7f96850 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
@@ -20,7 +20,7 @@ package org.apache.tinkerpop.gremlin.process.traversal;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.MessagePassingReductionStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.ProfileStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
@@ -223,7 +223,7 @@ public interface TraversalStrategies extends Serializable, Cloneable {
final TraversalStrategies graphComputerStrategies = new DefaultTraversalStrategies();
graphComputerStrategies.addStrategies(
GraphFilterStrategy.instance(),
- SingleIterationStrategy.instance(),
+ MessagePassingReductionStrategy.instance(),
OrderLimitStrategy.instance(),
PathProcessorStrategy.instance(),
ComputerVerificationStrategy.instance());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java
new file mode 100644
index 0000000..1171dcd
--- /dev/null
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.bothE;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
+import static org.apache.tinkerpop.gremlin.structure.Column.keys;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(Parameterized.class)
+public class MessagePassingReductionStrategyTest {
+
+ @Parameterized.Parameter(value = 0)
+ public Traversal original;
+
+ @Parameterized.Parameter(value = 1)
+ public Traversal optimized;
+
+ @Parameterized.Parameter(value = 2)
+ public Collection<TraversalStrategy> otherStrategies;
+
+ @Test
+ public void doTest() {
+ final Traversal.Admin<?, ?> rootTraversal = new DefaultGraphTraversal<>();
+ final TraversalVertexProgramStep parent = new TraversalVertexProgramStep(rootTraversal, this.original.asAdmin());
+ rootTraversal.addStep(parent.asStep());
+ parent.setComputerTraversal(this.original.asAdmin());
+ final TraversalStrategies strategies = new DefaultTraversalStrategies();
+ strategies.addStrategies(MessagePassingReductionStrategy.instance());
+ for (final TraversalStrategy strategy : this.otherStrategies) {
+ strategies.addStrategies(strategy);
+ }
+ rootTraversal.setStrategies(strategies);
+ rootTraversal.asAdmin().applyStrategies();
+ assertEquals(this.optimized, parent.computerTraversal.get());
+ }
+
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Iterable<Object[]> generateTestParameters() {
+ final Function<Traverser<Vertex>, Vertex> mapFunction = t -> t.get().vertices(Direction.OUT).next();
+ return Arrays.asList(new Object[][]{
+ {__.V().out().count(), __.V().outE().count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())},
+ {__.V().in().count(), __.V().local(__.inE().id()).count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())},
+ {__.V().id(), __.V().id(), Collections.emptyList()},
+ {__.V().local(out()), __.V().local(out()), Collections.emptyList()},
+ {__.V().local(in()), __.V().local(in()), Collections.emptyList()},
+ {__.V().local(out()).count(), __.V().local(out()).count(), Collections.emptyList()},
+ {__.V().local(in()).count(), __.V().local(in()).count(), Collections.emptyList()},
+ {__.V().out().count(), __.V().local(out().id()).count(), Collections.emptyList()},
+ {__.V().out().label().count(), __.V().out().label().count(), Collections.emptyList()},
+ {__.V().in().id(), __.V().local(in().id()), Collections.emptyList()},
+ {in().id(), __.local(in().id()), Collections.emptyList()}, // test inject
+ {__.V().out().id(), __.V().local(out().id()), Collections.emptyList()},
+ {__.V().both().id(), __.V().local(__.both().id()), Collections.emptyList()},
+ {__.V().outE().inV().id().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()},
+ {__.V().map(outE().inV()).count(), __.V().local(__.map(outE().inV()).id()).count(), Collections.emptyList()},
+ {__.V().out().map(outE().inV()).count(), __.V().out().map(outE().inV()).count(), Collections.emptyList()},
+ {__.V().outE().map(__.inV()).id().count(), __.V().local(__.outE().map(__.inV()).id()).count(), Collections.emptyList()},
+ {__.V().outE().map(__.inV()).count(), __.V().local(outE().map(__.inV()).id()).count(), Collections.emptyList()},
+ {__.V().outE().map(__.inV()).values("name").count(), __.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()},
+ {__.V().outE().inV().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()},
+ {__.V().out().id().count(), __.V().local(out().id()).count(), Collections.emptyList()},
+ {__.V().in().id().count(), __.V().local(in().id()).count(), Collections.emptyList()},
+ {__.V().in().id().select("id-map").dedup().count(), __.V().local(in().id().select("id-map")).dedup().count(), Collections.emptyList()},
+ {__.V().in().id().groupCount().select(keys).unfold().dedup().count(), __.V().local(in().id()).groupCount().select(keys).unfold().dedup().count(), Collections.emptyList()},
+ {__.V().outE().hasLabel("knows").values("weight"), __.V().local(outE().hasLabel("knows").values("weight")), Collections.emptyList()},
+ {__.V().outE().values("weight").sum(), __.V().local(outE().values("weight")).sum(), Collections.emptyList()},
+ {__.V().inE().values("weight"), __.V().local(inE().values("weight")), Collections.emptyList()},
+ {__.V().inE().values("weight").sum(), __.V().local(inE().values("weight")).sum(), Collections.emptyList()},
+ {__.V().inE().values("weight").sum().dedup().count(), __.V().local(inE().values("weight")).sum().dedup().count(), Collections.emptyList()},
+ {__.V().as("a").out("knows").as("b").select("a", "b"), __.V().as("a").out("knows").as("b").select("a", "b"), Collections.emptyList()},
+ {__.V().out().groupCount("x").cap("x"), __.V().out().groupCount("x").cap("x"), Collections.emptyList()},
+ {__.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), __.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), Collections.emptyList()},
+ {__.V().outE().inV().groupCount(), __.V().outE().inV().groupCount(), Collections.emptyList()},
+ {__.V().outE().inV().groupCount().by("name"), __.V().outE().inV().groupCount().by("name"), Collections.emptyList()},
+ {__.V().inE().id().groupCount(), __.V().local(inE().id()).groupCount(), Collections.emptyList()},
+ {__.V().inE().values("weight").groupCount(), __.V().local(inE().values("weight")).groupCount(), Collections.emptyList()},
+ {__.V().outE().inV().tree(), __.V().outE().inV().tree(), Collections.emptyList()},
+ {__.V().in().values("name").groupCount(), __.V().in().values("name").groupCount(), Collections.emptyList()},
+ {__.V().outE().inV().groupCount("x"), __.V().outE().inV().groupCount("x"), Collections.emptyList()},
+ {__.V().in().dedup().count(), __.V().local(in().id()).dedup().count(), Collections.emptyList()},
+ {__.V().bothE().dedup().count(), __.V().local(bothE().id()).dedup().count(), Collections.emptyList()},
+ {__.V().bothE().dedup().by("name").count(), __.V().bothE().dedup().by("name").count(), Collections.emptyList()},
+ {__.V().map(mapFunction).inV().count(), __.V().map(mapFunction).inV().count(), Collections.emptyList()},
+ {__.V().groupCount().by(__.out().count()), __.V().groupCount().by(__.out().count()), Collections.emptyList()},
+ {__.V().identity().out().identity().count(), __.V().local(__.identity().out().identity().id()).count(), Collections.emptyList()},
+ {__.V().identity().out().identity().dedup().count(), __.V().local(__.identity().out().identity().id()).dedup().count(), Collections.emptyList()},
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
deleted file mode 100644
index 081a541..0000000
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
-
-import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
-import org.apache.tinkerpop.gremlin.structure.Column;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.function.Function;
-
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.bothE;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
-import static org.apache.tinkerpop.gremlin.structure.Column.keys;
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(Parameterized.class)
-public class SingleIterationStrategyTest {
-
- @Parameterized.Parameter(value = 0)
- public Traversal original;
-
- @Parameterized.Parameter(value = 1)
- public Traversal optimized;
-
- @Parameterized.Parameter(value = 2)
- public Collection<TraversalStrategy> otherStrategies;
-
- @Test
- public void doTest() {
- final Traversal.Admin<?, ?> rootTraversal = new DefaultGraphTraversal<>();
- final TraversalVertexProgramStep parent = new TraversalVertexProgramStep(rootTraversal, this.original.asAdmin());
- rootTraversal.addStep(parent.asStep());
- parent.setComputerTraversal(this.original.asAdmin());
- final TraversalStrategies strategies = new DefaultTraversalStrategies();
- strategies.addStrategies(SingleIterationStrategy.instance());
- for (final TraversalStrategy strategy : this.otherStrategies) {
- strategies.addStrategies(strategy);
- }
- rootTraversal.setStrategies(strategies);
- rootTraversal.asAdmin().applyStrategies();
- assertEquals(this.optimized, parent.computerTraversal.get());
- }
-
-
- @Parameterized.Parameters(name = "{0}")
- public static Iterable<Object[]> generateTestParameters() {
- final Function<Traverser<Vertex>,Vertex> mapFunction = t -> t.get().vertices(Direction.OUT).next();
- return Arrays.asList(new Object[][]{
- {__.V().out().count(), __.V().local(out().id()).count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())}, // TODO
- {__.V().id(), __.V().id(), Collections.emptyList()},
- {__.V().out().count(), __.V().local(out().id()).count(), Collections.emptyList()},
- {__.V().out().label().count(), __.V().out().label().count(), Collections.emptyList()},
- {__.V().in().id(), __.V().local(in().id()), Collections.emptyList()},
- {in().id(), __.local(in().id()), Collections.emptyList()}, // test inject
- {__.V().out().id(), __.V().local(out().id()), Collections.emptyList()},
- {__.V().both().id(), __.V().local(__.both().id()), Collections.emptyList()},
- {__.V().outE().inV().id().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()},
- {__.V().map(outE().inV()).count(), __.V().local(__.map(outE().inV()).id()).count(), Collections.emptyList()},
- {__.V().out().map(outE().inV()).count(), __.V().out().map(outE().inV()).count(), Collections.emptyList()},
- {__.V().outE().map(__.inV()).id().count(), __.V().local(__.outE().map(__.inV()).id()).count(), Collections.emptyList()},
- {__.V().outE().map(__.inV()).count(), __.V().local(outE().map(__.inV()).id()).count(), Collections.emptyList()},
- {__.V().outE().map(__.inV()).values("name").count(), __.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()},
- {__.V().outE().inV().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()},
- {__.V().out().id().count(), __.V().local(out().id()).count(), Collections.emptyList()},
- {__.V().in().id().count(), __.V().local(in().id()).count(), Collections.emptyList()},
- {__.V().in().id().select("id-map").dedup().count(), __.V().local(in().id().select("id-map")).dedup().count(), Collections.emptyList()},
- {__.V().in().id().groupCount().select(keys).unfold().dedup().count(), __.V().local(in().id()).groupCount().select(keys).unfold().dedup().count(), Collections.emptyList()},
- {__.V().outE().hasLabel("knows").values("weight"), __.V().local(outE().hasLabel("knows").values("weight")), Collections.emptyList()},
- {__.V().outE().values("weight").sum(), __.V().local(outE().values("weight")).sum(), Collections.emptyList()},
- {__.V().inE().values("weight"), __.V().local(inE().values("weight")), Collections.emptyList()},
- {__.V().inE().values("weight").sum(), __.V().local(inE().values("weight")).sum(), Collections.emptyList()},
- {__.V().inE().values("weight").sum().dedup().count(), __.V().local(inE().values("weight")).sum().dedup().count(), Collections.emptyList()},
- {__.V().as("a").out("knows").as("b").select("a", "b"), __.V().as("a").out("knows").as("b").select("a", "b"), Collections.emptyList()},
- {__.V().out().groupCount("x").cap("x"), __.V().out().groupCount("x").cap("x"), Collections.emptyList()},
- {__.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), __.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), Collections.emptyList()},
- {__.V().outE().inV().groupCount(), __.V().outE().inV().groupCount(), Collections.emptyList()},
- {__.V().outE().inV().groupCount().by("name"), __.V().outE().inV().groupCount().by("name"), Collections.emptyList()},
- {__.V().inE().id().groupCount(), __.V().local(inE().id()).groupCount(), Collections.emptyList()},
- {__.V().inE().values("weight").groupCount(), __.V().local(inE().values("weight")).groupCount(), Collections.emptyList()},
- {__.V().outE().inV().tree(), __.V().outE().inV().tree(), Collections.emptyList()},
- {__.V().in().values("name").groupCount(), __.V().in().values("name").groupCount(), Collections.emptyList()},
- {__.V().outE().inV().groupCount("x"), __.V().outE().inV().groupCount("x"), Collections.emptyList()},
- {__.V().in().dedup().count(), __.V().local(in().id()).dedup().count(), Collections.emptyList()},
- {__.V().bothE().dedup().count(), __.V().local(bothE().id()).dedup().count(), Collections.emptyList()},
- {__.V().bothE().dedup().by("name").count(), __.V().bothE().dedup().by("name").count(), Collections.emptyList()},
- {__.V().map(mapFunction).inV().count(), __.V().map(mapFunction).inV().count(), Collections.emptyList()},
- {__.V().groupCount().by(__.out().count()),__.V().groupCount().by(__.out().count()),Collections.emptyList()}
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
index 1288b0d..9e85df6 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
@@ -21,23 +21,15 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.o
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.MessagePassingReductionStrategy;
import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
import org.apache.tinkerpop.gremlin.process.traversal.Scope;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -83,9 +75,9 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
}
}
if (!doesMessagePass &&
- !SparkSingleIterationStrategy.endsWithInElement(computerTraversal) &&
- !(computerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) || // todo: remove this when dynamic detachment is available in 3.3.0
- computerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH))) { // todo: remove this when dynamic detachment is available in 3.3.0
+ !MessagePassingReductionStrategy.endsWithElement(computerTraversal.getEndStep()) &&
+ !(computerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) || // TODO: remove this when dynamic detachment is available in 3.3.0
+ computerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH))) { // TODO: remove this when dynamic detachment is available in 3.3.0
step.setComputer(step.getComputer()
// if no message passing, don't partition the loaded graph
.configure(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, true)
@@ -95,34 +87,8 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
}
}
- private static final boolean endsWithInElement(final Traversal.Admin<?, ?> traversal) {
- Step<?, ?> current = traversal.getEndStep();
- while (current instanceof Barrier) { // clip off any tail barriers
- current = current.getPreviousStep();
- }
- while (!(current instanceof EmptyStep)) {
- if ((current instanceof VertexStep && (((VertexStep) current).returnsVertex() ||
- !((VertexStep) current).getDirection().equals(Direction.OUT))) ||
- current instanceof EdgeVertexStep) {
- return true;
- } else if (current instanceof TraversalMapStep || current instanceof TraversalFlatMapStep || current instanceof LocalStep) {
- if (endsWithInElement(((TraversalParent) current).getLocalChildren().get(0)))
- return true;
- } else if (current instanceof TraversalParent) {
- if (((TraversalParent) current).getGlobalChildren().stream().filter(SparkSingleIterationStrategy::endsWithInElement).findAny().isPresent())
- return true;
- }
- if (!(current instanceof FilterStep || current instanceof SideEffectStep)) { // side-effects and filters do not alter the mapping and thus, deeper analysis is required
- return false;
- }
- current = current.getPreviousStep();
- }
- return false;
- }
-
public static SparkSingleIterationStrategy instance() {
return INSTANCE;
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
index 4297649..8f97576 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
@@ -24,7 +24,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.MessagePassingReductionStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
@@ -67,11 +67,11 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
/////////// WITHOUT SINGLE-ITERATION STRATEGY LESS SINGLE-PASS OPTIONS ARE AVAILABLE
Graph graph = GraphFactory.open(configuration);
- GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class, SingleIterationStrategy.class);
+ GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class, MessagePassingReductionStrategy.class);
assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent());
- assertFalse(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
- assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
+ assertFalse(g.getStrategies().toList().contains(MessagePassingReductionStrategy.instance()));
+ assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof MessagePassingReductionStrategy).findAny().isPresent());
assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent());
@@ -95,11 +95,11 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
/////////// WITH SINGLE-ITERATION STRATEGY MORE SINGLE-PASS OPTIONS ARE AVAILABLE
graph = GraphFactory.open(configuration);
- g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(SingleIterationStrategy.instance());
+ g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(MessagePassingReductionStrategy.instance());
assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent());
- assertTrue(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
- assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
+ assertTrue(g.getStrategies().toList().contains(MessagePassingReductionStrategy.instance()));
+ assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof MessagePassingReductionStrategy).findAny().isPresent());
assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent());
@@ -114,6 +114,8 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
test(true, g.V().bothE().values("weight").limit(2));
test(true, 6L, g.V().count());
test(true, 6L, g.V().id().count());
+ test(true, 6L, g.V().identity().outE().identity().count());
+ test(true, 6L, g.V().identity().outE().has("weight").count());
test(true, 6L, g.V().out().count());
test(true, 6L, g.V().outE().inV().count());
test(true, 6L, g.V().outE().inV().id().count());
@@ -167,6 +169,7 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
test(false, g.V().out().groupCount("x").cap("x"));
test(false, 6L, g.V().both().groupCount("x").cap("x").select(keys).unfold().count());
test(false, g.V().outE().inV().groupCount());
+ test(false, g.V().outE().unfold().inV().groupCount());
test(false, g.V().outE().inV().groupCount().by("name"));
test(false, g.V().outE().inV().tree());
test(false, g.V().outE().inV().id().tree());