You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/27 18:30:20 UTC

tinkerpop git commit: realize that we can generalized SingleIterationStrategy to 1) work with GraphActors and 2) work at every walk-step. Thus, given that GraphActors isn't about Iterations and we can make this work for multi-stages, I renamed the strate

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1617 0213907a9 -> 6ec8d0994


realize that we can generalized SingleIterationStrategy to 1) work with GraphActors and 2) work at every walk-step. Thus, given that GraphActors isn't about Iterations and we can make this work for multi-stages, I renamed the strategy MessagePassingReductionStrategy. This way we won't have to name change in the future. Also, I did some more code cleanups and added a bunch more test cases. This is perhaps the most tested strategy.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/6ec8d099
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/6ec8d099
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/6ec8d099

Branch: refs/heads/TINKERPOP-1617
Commit: 6ec8d0994057a9fc6d0ed66e520715d6050b94f9
Parents: 0213907
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Jan 27 11:30:15 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 27 11:30:15 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   2 +-
 .../MessagePassingReductionStrategy.java        | 162 +++++++++++++++++++
 .../optimization/SingleIterationStrategy.java   | 151 -----------------
 .../process/traversal/TraversalStrategies.java  |   4 +-
 .../MessagePassingReductionStrategyTest.java    | 135 ++++++++++++++++
 .../SingleIterationStrategyTest.java            | 128 ---------------
 .../SparkSingleIterationStrategy.java           |  42 +----
 .../SparkSingleIterationStrategyTest.java       |  17 +-
 8 files changed, 314 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6ec8d099/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 70d6cd5..f499f38 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,7 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-* Added `SingleIterationStrategy` as a default `GraphComputer` strategy that can re-write some traversals to not require message passing.
+* Added default `MessagePassingReductionStrategy` for `GraphComputer` that can reduce the number of message passing iterations.
 * Fixed a bug associated with user-provided maps and `GroupSideEffectStep`.
 * `GroupBiOperator` no longer maintains a detached traversal and thus, no more side-effect related OLAP inconsistencies.
 * Added `ProjectedTraverser` which wraps a traverser with a `List<Object>` of projected data.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6ec8d099/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java
new file mode 100644
index 0000000..cff152e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.IdStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.IdentityStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MessagePassingReductionStrategy extends AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements TraversalStrategy.OptimizationStrategy {
+
+    private static final MessagePassingReductionStrategy INSTANCE = new MessagePassingReductionStrategy();
+
+    private static final Set<Class<? extends OptimizationStrategy>> PRIORS = new HashSet<>(Arrays.asList(
+            IncidentToAdjacentStrategy.class,
+            AdjacentToIncidentStrategy.class,
+            FilterRankingStrategy.class,
+            InlineFilterStrategy.class));
+
+    private MessagePassingReductionStrategy() {
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        // only process the first traversal step in an OLAP chain
+        TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal).ifPresent(step -> {
+            final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined
+            final Traversal.Admin<?, ?> compiledComputerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone();
+            if (!compiledComputerTraversal.isLocked())
+                compiledComputerTraversal.applyStrategies();
+            if (!TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(LocalStep.class, LambdaHolder.class), compiledComputerTraversal) && // don't do anything with lambdas or locals as this leads to unknown adjacencies
+                    !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH) &&            // when dynamic detachment is provided in 3.3.0, remove this (TODO)
+                    !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) &&    // when dynamic detachment is provided in 3.3.0, remove this (TODO)
+                    !(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, compiledComputerTraversal).          // this is a strict precaution that could be loosed with deeper logic on barriers in global children
+                            stream().
+                            filter(parent -> !parent.getGlobalChildren().isEmpty()).findAny().isPresent())) {
+                final Traversal.Admin<?, ?> computerTraversal = step.computerTraversal.get().clone();
+                // apply the strategies up to this point
+                final List<TraversalStrategy<?>> strategies = step.getTraversal().getStrategies().toList();
+                final int indexOfStrategy = strategies.indexOf(MessagePassingReductionStrategy.instance());
+                if (indexOfStrategy > 0)
+                    TraversalHelper.applySingleLevelStrategies(step.getTraversal(), computerTraversal, strategies.get(indexOfStrategy - 1).getClass());
+                if (computerTraversal.getSteps().size() > 1 &&
+                        !(computerTraversal.getStartStep().getNextStep() instanceof Barrier) &&
+                        TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(VertexStep.class, EdgeVertexStep.class), computerTraversal) &&
+                        TraversalHelper.isLocalStarGraph(computerTraversal)) {
+                    final Step barrier = (Step) TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, computerTraversal).orElse(null);
+                    if (MessagePassingReductionStrategy.insertElementId(barrier)) // out().count() -> out().id().count()
+                        TraversalHelper.insertBeforeStep(new IdStep(computerTraversal), barrier, computerTraversal);
+                    if (!(MessagePassingReductionStrategy.endsWithElement(null == barrier ? computerTraversal.getEndStep() : barrier))) {
+                        Traversal.Admin newChildTraversal = new DefaultGraphTraversal<>();
+                        TraversalHelper.removeToTraversal(computerTraversal.getStartStep() instanceof GraphStep ?
+                                computerTraversal.getStartStep().getNextStep() :
+                                (Step) computerTraversal.getStartStep(), null == barrier ?
+                                EmptyStep.instance() :
+                                barrier, newChildTraversal);
+                        newChildTraversal = newChildTraversal.getSteps().size() > 1 ? (Traversal.Admin) __.local(newChildTraversal) : newChildTraversal;
+                        if (null == barrier)
+                            TraversalHelper.insertTraversal(0, newChildTraversal, computerTraversal);
+                        else
+                            TraversalHelper.insertTraversal(barrier.getPreviousStep(), newChildTraversal, computerTraversal);
+                    }
+                }
+                step.setComputerTraversal(computerTraversal);
+            }
+        });
+    }
+
+    private static boolean insertElementId(final Step<?, ?> barrier) {
+        if (!(barrier instanceof Barrier))
+            return false;
+        else if (!endsWithElement(barrier.getPreviousStep()))
+            return false;
+        else if (barrier instanceof CountGlobalStep)
+            return true;
+        else if (barrier instanceof DedupGlobalStep &&
+                ((DedupGlobalStep) barrier).getScopeKeys().isEmpty() &&
+                ((DedupGlobalStep) barrier).getLocalChildren().isEmpty() &&
+                barrier.getNextStep() instanceof CountGlobalStep)
+            return true;
+        else
+            return false;
+    }
+
+    public static final boolean endsWithElement(Step<?, ?> currentStep) {
+        while (!(currentStep instanceof EmptyStep)) {
+            if (currentStep instanceof VertexStep) // only inE, in, and out send messages
+                return (((VertexStep) currentStep).returnsVertex() || !((VertexStep) currentStep).getDirection().equals(Direction.OUT));
+            else if (currentStep instanceof EdgeVertexStep) // TODO: add GraphStep but only if its mid-traversal V()/E()
+                return true;
+            else if (currentStep instanceof TraversalFlatMapStep || currentStep instanceof TraversalMapStep || currentStep instanceof LocalStep)
+                return endsWithElement(((TraversalParent) currentStep).getLocalChildren().get(0).getEndStep());
+            else if (!(currentStep instanceof FilterStep || currentStep instanceof SideEffectStep || currentStep instanceof IdentityStep || currentStep instanceof Barrier))
+                return false;
+            currentStep = currentStep.getPreviousStep();
+        }
+        return false;
+    }
+
+    @Override
+    public Set<Class<? extends OptimizationStrategy>> applyPrior() {
+        return PRIORS;
+    }
+
+    public static MessagePassingReductionStrategy instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6ec8d099/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
deleted file mode 100644
index 6b509ef..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
-
-import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
-import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.IdStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SingleIterationStrategy extends AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements TraversalStrategy.OptimizationStrategy {
-
-    private static final SingleIterationStrategy INSTANCE = new SingleIterationStrategy();
-
-    private static final Set<Class<? extends OptimizationStrategy>> PRIORS = new HashSet<>(Arrays.asList(
-            IncidentToAdjacentStrategy.class,
-            AdjacentToIncidentStrategy.class,
-            FilterRankingStrategy.class,
-            InlineFilterStrategy.class));
-
-    private SingleIterationStrategy() {
-    }
-
-    @Override
-    public void apply(final Traversal.Admin<?, ?> traversal) {
-        // only process the first traversal step in an OLAP chain
-        TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal).ifPresent(step -> {
-            final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined
-            final Traversal.Admin<?, ?> compiledComputerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone();
-            if (!compiledComputerTraversal.isLocked())
-                compiledComputerTraversal.applyStrategies();
-            if (!TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(LocalStep.class, LambdaHolder.class), compiledComputerTraversal) &&
-                    !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH) &&            // when dynamic detachment is provided in 3.3.0, remove this (TODO)
-                    !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) &&    // when dynamic detachment is provided in 3.3.0, remove this (TODO)
-                    !(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, compiledComputerTraversal).          // this is a strict precaution that could be loosed with deeper logic on barriers in global children
-                            stream().
-                            filter(parent -> !parent.getGlobalChildren().isEmpty()).findAny().isPresent())) {
-                final Traversal.Admin<?, ?> computerTraversal = step.computerTraversal.getPure();
-                if (computerTraversal.getSteps().size() > 1 &&
-                        !(computerTraversal.getStartStep().getNextStep() instanceof Barrier) &&
-                        TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(VertexStep.class, EdgeVertexStep.class), computerTraversal) &&
-                        TraversalHelper.isLocalStarGraph(computerTraversal)) {
-                    final Traversal.Admin newChildTraversal = new DefaultGraphTraversal<>();
-                    final Step barrier = (Step) TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, computerTraversal).orElse(null);
-                    if (SingleIterationStrategy.insertElementId(barrier)) // out().count() -> out().id().count()
-                        TraversalHelper.insertBeforeStep(new IdStep(computerTraversal), barrier, computerTraversal);
-                    if (!(SingleIterationStrategy.endsWithElement(null == barrier ? computerTraversal.getEndStep() : barrier.getPreviousStep()))) {
-                        TraversalHelper.removeToTraversal(computerTraversal.getStartStep() instanceof GraphStep ?
-                                computerTraversal.getStartStep().getNextStep() :
-                                (Step) computerTraversal.getStartStep(), null == barrier ?
-                                EmptyStep.instance() :
-                                barrier, newChildTraversal);
-                        if (null == barrier)
-                            TraversalHelper.insertTraversal(0, (Traversal.Admin) __.local(newChildTraversal), computerTraversal);
-                        else
-                            TraversalHelper.insertTraversal(barrier.getPreviousStep(), (Traversal.Admin) __.local(newChildTraversal), computerTraversal);
-                    }
-                }
-                step.setComputerTraversal(computerTraversal);
-            }
-        });
-    }
-
-    private static boolean insertElementId(final Step<?, ?> barrier) {
-        if (!(barrier instanceof Barrier))
-            return false;
-        else if (!endsWithElement(barrier.getPreviousStep()))
-            return false;
-        else if (barrier instanceof CountGlobalStep)
-            return true;
-        else if (barrier instanceof DedupGlobalStep &&
-                ((DedupGlobalStep) barrier).getScopeKeys().isEmpty() &&
-                ((DedupGlobalStep) barrier).getLocalChildren().isEmpty() &&
-                barrier.getNextStep() instanceof CountGlobalStep)
-            return true;
-        else
-            return false;
-    }
-
-    private static boolean endsWithElement(Step<?, ?> currentStep) {
-        while (!(currentStep instanceof EmptyStep)) {
-            if (currentStep instanceof VertexStep || currentStep instanceof EdgeVertexStep) // TODO: add GraphStep but only if its mid-traversal V()/E()
-                return true;
-            else if (currentStep instanceof TraversalFlatMapStep || currentStep instanceof TraversalMapStep)
-                return endsWithElement(((TraversalParent) currentStep).getLocalChildren().get(0).getEndStep());
-            else if (!(currentStep instanceof FilterStep || currentStep instanceof SideEffectStep))
-                return false;
-            currentStep = currentStep.getPreviousStep();
-        }
-        return false;
-    }
-
-    @Override
-    public Set<Class<? extends OptimizationStrategy>> applyPrior() {
-        return PRIORS;
-    }
-
-    public static SingleIterationStrategy instance() {
-        return INSTANCE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6ec8d099/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
index a1cb99c..7f96850 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
@@ -20,7 +20,7 @@ package org.apache.tinkerpop.gremlin.process.traversal;
 
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.MessagePassingReductionStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.ProfileStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
@@ -223,7 +223,7 @@ public interface TraversalStrategies extends Serializable, Cloneable {
             final TraversalStrategies graphComputerStrategies = new DefaultTraversalStrategies();
             graphComputerStrategies.addStrategies(
                     GraphFilterStrategy.instance(),
-                    SingleIterationStrategy.instance(),
+                    MessagePassingReductionStrategy.instance(),
                     OrderLimitStrategy.instance(),
                     PathProcessorStrategy.instance(),
                     ComputerVerificationStrategy.instance());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6ec8d099/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java
new file mode 100644
index 0000000..1171dcd
--- /dev/null
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java
@@ -0,0 +1,135 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.bothE;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
+import static org.apache.tinkerpop.gremlin.structure.Column.keys;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(Parameterized.class)
+public class MessagePassingReductionStrategyTest {
+
+    @Parameterized.Parameter(value = 0)
+    public Traversal original;
+
+    @Parameterized.Parameter(value = 1)
+    public Traversal optimized;
+
+    @Parameterized.Parameter(value = 2)
+    public Collection<TraversalStrategy> otherStrategies;
+
+    @Test
+    public void doTest() {
+        final Traversal.Admin<?, ?> rootTraversal = new DefaultGraphTraversal<>();
+        final TraversalVertexProgramStep parent = new TraversalVertexProgramStep(rootTraversal, this.original.asAdmin());
+        rootTraversal.addStep(parent.asStep());
+        parent.setComputerTraversal(this.original.asAdmin());
+        final TraversalStrategies strategies = new DefaultTraversalStrategies();
+        strategies.addStrategies(MessagePassingReductionStrategy.instance());
+        for (final TraversalStrategy strategy : this.otherStrategies) {
+            strategies.addStrategies(strategy);
+        }
+        rootTraversal.setStrategies(strategies);
+        rootTraversal.asAdmin().applyStrategies();
+        assertEquals(this.optimized, parent.computerTraversal.get());
+    }
+
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Iterable<Object[]> generateTestParameters() {
+        final Function<Traverser<Vertex>, Vertex> mapFunction = t -> t.get().vertices(Direction.OUT).next();
+        return Arrays.asList(new Object[][]{
+                {__.V().out().count(), __.V().outE().count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())},
+                {__.V().in().count(), __.V().local(__.inE().id()).count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())},
+                {__.V().id(), __.V().id(), Collections.emptyList()},
+                {__.V().local(out()), __.V().local(out()), Collections.emptyList()},
+                {__.V().local(in()), __.V().local(in()), Collections.emptyList()},
+                {__.V().local(out()).count(), __.V().local(out()).count(), Collections.emptyList()},
+                {__.V().local(in()).count(), __.V().local(in()).count(), Collections.emptyList()},
+                {__.V().out().count(), __.V().local(out().id()).count(), Collections.emptyList()},
+                {__.V().out().label().count(), __.V().out().label().count(), Collections.emptyList()},
+                {__.V().in().id(), __.V().local(in().id()), Collections.emptyList()},
+                {in().id(), __.local(in().id()), Collections.emptyList()}, // test inject
+                {__.V().out().id(), __.V().local(out().id()), Collections.emptyList()},
+                {__.V().both().id(), __.V().local(__.both().id()), Collections.emptyList()},
+                {__.V().outE().inV().id().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()},
+                {__.V().map(outE().inV()).count(), __.V().local(__.map(outE().inV()).id()).count(), Collections.emptyList()},
+                {__.V().out().map(outE().inV()).count(), __.V().out().map(outE().inV()).count(), Collections.emptyList()},
+                {__.V().outE().map(__.inV()).id().count(), __.V().local(__.outE().map(__.inV()).id()).count(), Collections.emptyList()},
+                {__.V().outE().map(__.inV()).count(), __.V().local(outE().map(__.inV()).id()).count(), Collections.emptyList()},
+                {__.V().outE().map(__.inV()).values("name").count(), __.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()},
+                {__.V().outE().inV().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()},
+                {__.V().out().id().count(), __.V().local(out().id()).count(), Collections.emptyList()},
+                {__.V().in().id().count(), __.V().local(in().id()).count(), Collections.emptyList()},
+                {__.V().in().id().select("id-map").dedup().count(), __.V().local(in().id().select("id-map")).dedup().count(), Collections.emptyList()},
+                {__.V().in().id().groupCount().select(keys).unfold().dedup().count(), __.V().local(in().id()).groupCount().select(keys).unfold().dedup().count(), Collections.emptyList()},
+                {__.V().outE().hasLabel("knows").values("weight"), __.V().local(outE().hasLabel("knows").values("weight")), Collections.emptyList()},
+                {__.V().outE().values("weight").sum(), __.V().local(outE().values("weight")).sum(), Collections.emptyList()},
+                {__.V().inE().values("weight"), __.V().local(inE().values("weight")), Collections.emptyList()},
+                {__.V().inE().values("weight").sum(), __.V().local(inE().values("weight")).sum(), Collections.emptyList()},
+                {__.V().inE().values("weight").sum().dedup().count(), __.V().local(inE().values("weight")).sum().dedup().count(), Collections.emptyList()},
+                {__.V().as("a").out("knows").as("b").select("a", "b"), __.V().as("a").out("knows").as("b").select("a", "b"), Collections.emptyList()},
+                {__.V().out().groupCount("x").cap("x"), __.V().out().groupCount("x").cap("x"), Collections.emptyList()},
+                {__.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), __.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), Collections.emptyList()},
+                {__.V().outE().inV().groupCount(), __.V().outE().inV().groupCount(), Collections.emptyList()},
+                {__.V().outE().inV().groupCount().by("name"), __.V().outE().inV().groupCount().by("name"), Collections.emptyList()},
+                {__.V().inE().id().groupCount(), __.V().local(inE().id()).groupCount(), Collections.emptyList()},
+                {__.V().inE().values("weight").groupCount(), __.V().local(inE().values("weight")).groupCount(), Collections.emptyList()},
+                {__.V().outE().inV().tree(), __.V().outE().inV().tree(), Collections.emptyList()},
+                {__.V().in().values("name").groupCount(), __.V().in().values("name").groupCount(), Collections.emptyList()},
+                {__.V().outE().inV().groupCount("x"), __.V().outE().inV().groupCount("x"), Collections.emptyList()},
+                {__.V().in().dedup().count(), __.V().local(in().id()).dedup().count(), Collections.emptyList()},
+                {__.V().bothE().dedup().count(), __.V().local(bothE().id()).dedup().count(), Collections.emptyList()},
+                {__.V().bothE().dedup().by("name").count(), __.V().bothE().dedup().by("name").count(), Collections.emptyList()},
+                {__.V().map(mapFunction).inV().count(), __.V().map(mapFunction).inV().count(), Collections.emptyList()},
+                {__.V().groupCount().by(__.out().count()), __.V().groupCount().by(__.out().count()), Collections.emptyList()},
+                {__.V().identity().out().identity().count(), __.V().local(__.identity().out().identity().id()).count(), Collections.emptyList()},
+                {__.V().identity().out().identity().dedup().count(), __.V().local(__.identity().out().identity().id()).dedup().count(), Collections.emptyList()},
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6ec8d099/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
deleted file mode 100644
index 081a541..0000000
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
-
-import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
-import org.apache.tinkerpop.gremlin.structure.Column;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.function.Function;
-
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.bothE;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
-import static org.apache.tinkerpop.gremlin.structure.Column.keys;
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(Parameterized.class)
-public class SingleIterationStrategyTest {
-
-    @Parameterized.Parameter(value = 0)
-    public Traversal original;
-
-    @Parameterized.Parameter(value = 1)
-    public Traversal optimized;
-
-    @Parameterized.Parameter(value = 2)
-    public Collection<TraversalStrategy> otherStrategies;
-
-    @Test
-    public void doTest() {
-        final Traversal.Admin<?, ?> rootTraversal = new DefaultGraphTraversal<>();
-        final TraversalVertexProgramStep parent = new TraversalVertexProgramStep(rootTraversal, this.original.asAdmin());
-        rootTraversal.addStep(parent.asStep());
-        parent.setComputerTraversal(this.original.asAdmin());
-        final TraversalStrategies strategies = new DefaultTraversalStrategies();
-        strategies.addStrategies(SingleIterationStrategy.instance());
-        for (final TraversalStrategy strategy : this.otherStrategies) {
-            strategies.addStrategies(strategy);
-        }
-        rootTraversal.setStrategies(strategies);
-        rootTraversal.asAdmin().applyStrategies();
-        assertEquals(this.optimized, parent.computerTraversal.get());
-    }
-
-
-    @Parameterized.Parameters(name = "{0}")
-    public static Iterable<Object[]> generateTestParameters() {
-        final Function<Traverser<Vertex>,Vertex> mapFunction = t -> t.get().vertices(Direction.OUT).next();
-        return Arrays.asList(new Object[][]{
-                {__.V().out().count(), __.V().local(out().id()).count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())}, // TODO
-                {__.V().id(), __.V().id(), Collections.emptyList()},
-                {__.V().out().count(), __.V().local(out().id()).count(), Collections.emptyList()},
-                {__.V().out().label().count(), __.V().out().label().count(), Collections.emptyList()},
-                {__.V().in().id(), __.V().local(in().id()), Collections.emptyList()},
-                {in().id(), __.local(in().id()), Collections.emptyList()}, // test inject
-                {__.V().out().id(), __.V().local(out().id()), Collections.emptyList()},
-                {__.V().both().id(), __.V().local(__.both().id()), Collections.emptyList()},
-                {__.V().outE().inV().id().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()},
-                {__.V().map(outE().inV()).count(), __.V().local(__.map(outE().inV()).id()).count(), Collections.emptyList()},
-                {__.V().out().map(outE().inV()).count(), __.V().out().map(outE().inV()).count(), Collections.emptyList()},
-                {__.V().outE().map(__.inV()).id().count(), __.V().local(__.outE().map(__.inV()).id()).count(), Collections.emptyList()},
-                {__.V().outE().map(__.inV()).count(), __.V().local(outE().map(__.inV()).id()).count(), Collections.emptyList()},
-                {__.V().outE().map(__.inV()).values("name").count(), __.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()},
-                {__.V().outE().inV().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()},
-                {__.V().out().id().count(), __.V().local(out().id()).count(), Collections.emptyList()},
-                {__.V().in().id().count(), __.V().local(in().id()).count(), Collections.emptyList()},
-                {__.V().in().id().select("id-map").dedup().count(), __.V().local(in().id().select("id-map")).dedup().count(), Collections.emptyList()},
-                {__.V().in().id().groupCount().select(keys).unfold().dedup().count(), __.V().local(in().id()).groupCount().select(keys).unfold().dedup().count(), Collections.emptyList()},
-                {__.V().outE().hasLabel("knows").values("weight"), __.V().local(outE().hasLabel("knows").values("weight")), Collections.emptyList()},
-                {__.V().outE().values("weight").sum(), __.V().local(outE().values("weight")).sum(), Collections.emptyList()},
-                {__.V().inE().values("weight"), __.V().local(inE().values("weight")), Collections.emptyList()},
-                {__.V().inE().values("weight").sum(), __.V().local(inE().values("weight")).sum(), Collections.emptyList()},
-                {__.V().inE().values("weight").sum().dedup().count(), __.V().local(inE().values("weight")).sum().dedup().count(), Collections.emptyList()},
-                {__.V().as("a").out("knows").as("b").select("a", "b"), __.V().as("a").out("knows").as("b").select("a", "b"), Collections.emptyList()},
-                {__.V().out().groupCount("x").cap("x"), __.V().out().groupCount("x").cap("x"), Collections.emptyList()},
-                {__.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), __.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), Collections.emptyList()},
-                {__.V().outE().inV().groupCount(), __.V().outE().inV().groupCount(), Collections.emptyList()},
-                {__.V().outE().inV().groupCount().by("name"), __.V().outE().inV().groupCount().by("name"), Collections.emptyList()},
-                {__.V().inE().id().groupCount(), __.V().local(inE().id()).groupCount(), Collections.emptyList()},
-                {__.V().inE().values("weight").groupCount(), __.V().local(inE().values("weight")).groupCount(), Collections.emptyList()},
-                {__.V().outE().inV().tree(), __.V().outE().inV().tree(), Collections.emptyList()},
-                {__.V().in().values("name").groupCount(), __.V().in().values("name").groupCount(), Collections.emptyList()},
-                {__.V().outE().inV().groupCount("x"), __.V().outE().inV().groupCount("x"), Collections.emptyList()},
-                {__.V().in().dedup().count(), __.V().local(in().id()).dedup().count(), Collections.emptyList()},
-                {__.V().bothE().dedup().count(), __.V().local(bothE().id()).dedup().count(), Collections.emptyList()},
-                {__.V().bothE().dedup().by("name").count(), __.V().bothE().dedup().by("name").count(), Collections.emptyList()},
-                {__.V().map(mapFunction).inV().count(), __.V().map(mapFunction).inV().count(), Collections.emptyList()},
-                {__.V().groupCount().by(__.out().count()),__.V().groupCount().by(__.out().count()),Collections.emptyList()}
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6ec8d099/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
index 1288b0d..9e85df6 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
@@ -21,23 +21,15 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.o
 
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.MessagePassingReductionStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
 import org.apache.tinkerpop.gremlin.process.traversal.Scope;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -83,9 +75,9 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
                 }
             }
             if (!doesMessagePass &&
-                    !SparkSingleIterationStrategy.endsWithInElement(computerTraversal) &&
-                    !(computerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) || // todo: remove this when dynamic detachment is available in 3.3.0
-                            computerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH))) {  // todo: remove this when dynamic detachment is available in 3.3.0
+                    !MessagePassingReductionStrategy.endsWithElement(computerTraversal.getEndStep()) &&
+                    !(computerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) || // TODO: remove this when dynamic detachment is available in 3.3.0
+                            computerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH))) {  // TODO: remove this when dynamic detachment is available in 3.3.0
                 step.setComputer(step.getComputer()
                         // if no message passing, don't partition the loaded graph
                         .configure(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, true)
@@ -95,34 +87,8 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
         }
     }
 
-    private static final boolean endsWithInElement(final Traversal.Admin<?, ?> traversal) {
-        Step<?, ?> current = traversal.getEndStep();
-        while (current instanceof Barrier) { // clip off any tail barriers
-            current = current.getPreviousStep();
-        }
-        while (!(current instanceof EmptyStep)) {
-            if ((current instanceof VertexStep && (((VertexStep) current).returnsVertex() ||
-                    !((VertexStep) current).getDirection().equals(Direction.OUT))) ||
-                    current instanceof EdgeVertexStep) {
-                return true;
-            } else if (current instanceof TraversalMapStep || current instanceof TraversalFlatMapStep || current instanceof LocalStep) {
-                if (endsWithInElement(((TraversalParent) current).getLocalChildren().get(0)))
-                    return true;
-            } else if (current instanceof TraversalParent) {
-                if (((TraversalParent) current).getGlobalChildren().stream().filter(SparkSingleIterationStrategy::endsWithInElement).findAny().isPresent())
-                    return true;
-            }
-            if (!(current instanceof FilterStep || current instanceof SideEffectStep)) { // side-effects and filters do not alter the mapping and thus, deeper analysis is required
-                return false;
-            }
-            current = current.getPreviousStep();
-        }
-        return false;
-    }
-
     public static SparkSingleIterationStrategy instance() {
         return INSTANCE;
     }
 
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6ec8d099/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
index 4297649..8f97576 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
@@ -24,7 +24,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.MessagePassingReductionStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
@@ -67,11 +67,11 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
         /////////// WITHOUT SINGLE-ITERATION STRATEGY LESS SINGLE-PASS OPTIONS ARE AVAILABLE
 
         Graph graph = GraphFactory.open(configuration);
-        GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class, SingleIterationStrategy.class);
+        GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class, MessagePassingReductionStrategy.class);
         assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
         assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent());
-        assertFalse(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
-        assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
+        assertFalse(g.getStrategies().toList().contains(MessagePassingReductionStrategy.instance()));
+        assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof MessagePassingReductionStrategy).findAny().isPresent());
         assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
         assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent());
 
@@ -95,11 +95,11 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
         /////////// WITH SINGLE-ITERATION STRATEGY MORE SINGLE-PASS OPTIONS ARE AVAILABLE
 
         graph = GraphFactory.open(configuration);
-        g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(SingleIterationStrategy.instance());
+        g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(MessagePassingReductionStrategy.instance());
         assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
         assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent());
-        assertTrue(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
-        assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
+        assertTrue(g.getStrategies().toList().contains(MessagePassingReductionStrategy.instance()));
+        assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof MessagePassingReductionStrategy).findAny().isPresent());
         assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
         assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent());
 
@@ -114,6 +114,8 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
         test(true, g.V().bothE().values("weight").limit(2));
         test(true, 6L, g.V().count());
         test(true, 6L, g.V().id().count());
+        test(true, 6L, g.V().identity().outE().identity().count());
+        test(true, 6L, g.V().identity().outE().has("weight").count());
         test(true, 6L, g.V().out().count());
         test(true, 6L, g.V().outE().inV().count());
         test(true, 6L, g.V().outE().inV().id().count());
@@ -167,6 +169,7 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
         test(false, g.V().out().groupCount("x").cap("x"));
         test(false, 6L, g.V().both().groupCount("x").cap("x").select(keys).unfold().count());
         test(false, g.V().outE().inV().groupCount());
+        test(false, g.V().outE().unfold().inV().groupCount());
         test(false, g.V().outE().inV().groupCount().by("name"));
         test(false, g.V().outE().inV().tree());
         test(false, g.V().outE().inV().id().tree());