You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/27 21:24:45 UTC

[3/6] tinkerpop git commit: Added SingleIterationStrategy which is able to rewrite a set of traversals to not use message passing in OLAP. This is signficant for all GraphComputers as message passing is expense and furthermore, for SparkGraphComputer as

Added SingleIterationStrategy which is able to rewrite a set of traversals to not use message passing in OLAP. This is signficant for all GraphComputers as message passing is expense and furthermore, for SparkGraphComputer as without message-passing, there is no need to cache or partition the graph once loaded.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f6b66977
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f6b66977
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f6b66977

Branch: refs/heads/tp32
Commit: f6b669778ec058a555623c6119e0feaaac59c8be
Parents: f0875d7
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 26 12:57:58 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 27 14:24:18 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   3 +-
 .../step/map/TraversalVertexProgramStep.java    |   5 +
 .../optimization/SingleIterationStrategy.java   | 132 +++++++++++++++++++
 .../process/traversal/TraversalStrategies.java  |   2 +
 .../SingleIterationStrategyTest.java            | 100 ++++++++++++++
 .../SparkSingleIterationStrategy.java           |  48 ++++++-
 .../SparkSingleIterationStrategyTest.java       |  89 +++++++++++--
 7 files changed, 362 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 95cfb71..860d401 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,7 +26,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-* Fixed a bug where `keepLabels` were not being pushed down into child traversals by `PathRetractionStrategy`.
+* Fixed a bug where `PathProcessor.keepLabels` were not being pushed down into child traversals by `PathRetractionStrategy`.
+* Added `SingleIterationStrategy` as a default `GraphComputer` strategy that can re-write some traversals to not require message passing.
 * Fixed a bug associated with user-provided maps and `GroupSideEffectStep`.
 * `GroupBiOperator` no longer maintains a detached traversal and thus, no more side-effect related OLAP inconsistencies.
 * Added `ProjectedTraverser` which wraps a traverser with a `List<Object>` of projected data.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
index cb7db29..e866ce2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
@@ -54,6 +54,11 @@ public final class TraversalVertexProgramStep extends VertexProgramStep implemen
         return Collections.singletonList(this.computerTraversal.get());
     }
 
+    public void setComputerTraversal(final Traversal.Admin<?,?> computerTraversal) {
+        this.computerTraversal = new PureTraversal<>(computerTraversal);
+        this.integrateChild(this.computerTraversal.get());
+    }
+
     @Override
     public String toString() {
         return StringFactory.stepString(this, this.computerTraversal.get(), new GraphFilter(this.computer));

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
new file mode 100644
index 0000000..efcbe9a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
@@ -0,0 +1,132 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SingleIterationStrategy extends AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements TraversalStrategy.OptimizationStrategy {
+
+    private static final SingleIterationStrategy INSTANCE = new SingleIterationStrategy();
+
+    private static final Set<Class<? extends OptimizationStrategy>> PRIORS = new HashSet<>(Arrays.asList(
+            IncidentToAdjacentStrategy.class,
+            AdjacentToIncidentStrategy.class,
+            FilterRankingStrategy.class,
+            InlineFilterStrategy.class));
+
+    private static final Set<Class> MULTI_ITERATION_CLASSES = new HashSet<>(Arrays.asList(
+            EdgeVertexStep.class,
+            LambdaFlatMapStep.class,
+            LambdaMapStep.class
+    ));
+
+
+    private SingleIterationStrategy() {
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        // only process the first traversal step in an OLAP chain
+        TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal).ifPresent(step -> {
+            final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined
+            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone();
+            // does the traversal as it is message pass?
+            boolean doesMessagePass = TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, MULTI_ITERATION_CLASSES, computerTraversal);
+            if (!doesMessagePass) {
+                for (final VertexStep vertexStep : TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, VertexStep.class, computerTraversal)) {
+                    if (vertexStep.returnsVertex() || !vertexStep.getDirection().equals(Direction.OUT)) { // in-edges require message pass in OLAP
+                        doesMessagePass = true;
+                        break;
+                    }
+                }
+            } // if the traversal doesn't message pass, then don't try and localize it as its just wasted computation
+            if (doesMessagePass) {
+                final boolean beyondStarGraph =
+                        TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, LambdaHolder.class, computerTraversal) ||
+                                !TraversalHelper.isLocalStarGraph(computerTraversal);
+                if (!beyondStarGraph &&                                                                        // if we move beyond the star graph, then localization is not possible.
+                        !(computerTraversal.getStartStep().getNextStep() instanceof EmptyStep) &&              // if its just a g.V()/E(), then don't localize
+                        !(computerTraversal.getStartStep().getNextStep() instanceof LocalStep) &&              // removes the potential for the infinite recursive application of the traversal
+                        !(computerTraversal.getStartStep().getNextStep() instanceof Barrier) &&                // if the second step is a barrier, no point in trying to localize anything
+                        !(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, computerTraversal). // this is a strict precaution that could be loosed with deeper logic on barriers in global children
+                                stream().
+                                filter(parent -> !parent.getGlobalChildren().isEmpty()).findAny().isPresent())) {
+
+                    final Traversal.Admin<?, ?> newComputerTraversal = step.computerTraversal.getPure();
+                    final Traversal.Admin localTraversal = new DefaultGraphTraversal<>();
+                    final Step barrier = (Step) TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, newComputerTraversal).orElse(null);
+                    final Step endStep = null == barrier ? newComputerTraversal.getEndStep() : barrier.getPreviousStep();
+                    if (!(endStep instanceof VertexStep || endStep instanceof EdgeVertexStep)) {
+                        TraversalHelper.removeToTraversal(newComputerTraversal.getStartStep().getNextStep(), null == barrier ? EmptyStep.instance() : barrier, localTraversal);
+                        assert !localTraversal.getSteps().isEmpty(); // given the if() constraints, this is impossible
+                        if (localTraversal.getSteps().size() > 1) { // if its just a single step, a local wrap will not alter its locus of computation
+                            if (null == barrier)
+                                TraversalHelper.insertTraversal(0, (Traversal.Admin) __.local(localTraversal), newComputerTraversal);
+                            else
+                                TraversalHelper.insertTraversal(barrier.getPreviousStep(), (Traversal.Admin) __.local(localTraversal), newComputerTraversal);
+                            step.setComputerTraversal(newComputerTraversal);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public Set<Class<? extends OptimizationStrategy>> applyPrior() {
+        return PRIORS;
+    }
+
+    public static SingleIterationStrategy instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
index 015df70..a1cb99c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.process.traversal;
 
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.ProfileStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
@@ -222,6 +223,7 @@ public interface TraversalStrategies extends Serializable, Cloneable {
             final TraversalStrategies graphComputerStrategies = new DefaultTraversalStrategies();
             graphComputerStrategies.addStrategies(
                     GraphFilterStrategy.instance(),
+                    SingleIterationStrategy.instance(),
                     OrderLimitStrategy.instance(),
                     PathProcessorStrategy.instance(),
                     ComputerVerificationStrategy.instance());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
new file mode 100644
index 0000000..612fb9d
--- /dev/null
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
@@ -0,0 +1,100 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(Parameterized.class)
+public class SingleIterationStrategyTest {
+
+    @Parameterized.Parameter(value = 0)
+    public Traversal original;
+
+    @Parameterized.Parameter(value = 1)
+    public Traversal optimized;
+
+    @Parameterized.Parameter(value = 2)
+    public Collection<TraversalStrategy> otherStrategies;
+
+    @Test
+    public void doTest() {
+        final Traversal.Admin<?, ?> rootTraversal = new DefaultGraphTraversal<>();
+        final TraversalVertexProgramStep parent = new TraversalVertexProgramStep(rootTraversal, this.original.asAdmin());
+        rootTraversal.addStep(parent.asStep());
+        parent.setComputerTraversal(this.original.asAdmin());
+        final TraversalStrategies strategies = new DefaultTraversalStrategies();
+        strategies.addStrategies(SingleIterationStrategy.instance());
+        for (final TraversalStrategy strategy : this.otherStrategies) {
+            strategies.addStrategies(strategy);
+        }
+        rootTraversal.setStrategies(strategies);
+        rootTraversal.asAdmin().applyStrategies();
+        assertEquals(this.optimized, parent.computerTraversal.get());
+    }
+
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Iterable<Object[]> generateTestParameters() {
+
+        return Arrays.asList(new Object[][]{
+                {__.V().out().count(), __.V().outE().count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())},
+                {__.V().id(), __.V().id(), Collections.emptyList()},
+                {__.V().out().count(), __.V().out().count(), Collections.emptyList()},
+                {__.V().out().label().count(), __.V().out().label().count(), Collections.emptyList()},
+                {__.V().in().id(), __.V().local(__.in().id()), Collections.emptyList()},
+                {__.V().out().id(), __.V().local(__.out().id()), Collections.emptyList()},
+                {__.V().both().id(), __.V().local(__.both().id()), Collections.emptyList()},
+                {__.V().outE().inV().id().count(), __.V().local(__.outE().inV().id()).count(), Collections.emptyList()},
+                {__.V().map(__.outE().inV()).count(), __.V().map(__.outE().inV()).count(), Collections.emptyList()},
+                {__.V().out().map(__.outE().inV()).count(), __.V().out().map(__.outE().inV()).count(), Collections.emptyList()},
+                {__.V().outE().map(__.inV()).id().count(), __.V().outE().map(__.inV()).id().count(), Collections.emptyList()},
+                {__.V().outE().map(__.inV()).count(), __.V().outE().map(__.inV()).count(), Collections.emptyList()},
+                {__.V().outE().map(__.inV()).values("name").count(), __.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()},
+                {__.V().outE().inV().count(), __.V().outE().inV().count(), Collections.emptyList()},
+                {__.V().out().id().count(), __.V().local(__.out().id()).count(), Collections.emptyList()},
+                {__.V().in().id().count(), __.V().local(__.in().id()).count(), Collections.emptyList()},
+                {__.V().in().id().select("id-map").dedup().count(), __.V().local(__.in().id().select("id-map")).dedup().count(), Collections.emptyList()},
+                {__.V().outE().values("weight"), __.V().outE().values("weight"), Collections.emptyList()},
+                {__.V().outE().values("weight").sum(), __.V().outE().values("weight").sum(), Collections.emptyList()},
+                {__.V().inE().values("weight"), __.V().local(__.inE().values("weight")), Collections.emptyList()},
+                {__.V().inE().values("weight").sum(), __.V().local(__.inE().values("weight")).sum(), Collections.emptyList()},
+                {__.V().inE().values("weight").sum().dedup().count(), __.V().local(__.inE().values("weight")).sum().dedup().count(), Collections.emptyList()},
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
index a4acf4c..2abb9b8 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
@@ -23,12 +23,23 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
 import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectOneStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.structure.Direction;
@@ -48,9 +59,8 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
 
     private static final Set<Class> MULTI_ITERATION_CLASSES = new HashSet<>(Arrays.asList(
             EdgeVertexStep.class,
-            LambdaMapStep.class, // maybe?
-            LambdaFlatMapStep.class // maybe?
-            // VertexStep is special as you need to see if the return class is Edge or Vertex (logic below)
+            LambdaMapStep.class,
+            LambdaFlatMapStep.class
     ));
 
     private SparkSingleIterationStrategy() {
@@ -63,6 +73,7 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
             final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone();
             if (!computerTraversal.isLocked())
                 computerTraversal.applyStrategies();
+            ///
             boolean doesMessagePass = TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, MULTI_ITERATION_CLASSES, computerTraversal);
             if (!doesMessagePass) {
                 for (final VertexStep vertexStep : TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, VertexStep.class, computerTraversal)) {
@@ -72,18 +83,45 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
                     }
                 }
             }
-            if (!doesMessagePass) {
+            if (!doesMessagePass && !SparkSingleIterationStrategy.endsWithInElement(computerTraversal)) {
                 step.setComputer(step.getComputer()
                         // if no message passing, don't partition the loaded graph
                         .configure(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, true)
-                                // if no message passing, don't cache the loaded graph
+                        // if no message passing, don't cache the loaded graph
                         .configure(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, true));
             }
         }
     }
 
+    private static final boolean endsWithInElement(final Traversal.Admin<?, ?> traversal) {
+        Step<?, ?> current = traversal.getEndStep();
+        while (!(current instanceof EmptyStep)) {
+            if ((current instanceof VertexStep && (((VertexStep) current).returnsVertex() ||
+                    !((VertexStep) current).getDirection().equals(Direction.OUT))) ||
+                    current instanceof EdgeVertexStep) {
+                return true;
+            } else if (current instanceof TraversalMapStep || current instanceof TraversalFlatMapStep || current instanceof LocalStep) {
+                if (endsWithInElement(((TraversalParent) current).getLocalChildren().get(0)))
+                    return true;
+            } else if (current instanceof TraversalParent) {
+                if (((TraversalParent) current).getGlobalChildren().stream().filter(SparkSingleIterationStrategy::endsWithInElement).findAny().isPresent())
+                    return true;
+            }
+            if (!(current instanceof FilterStep ||
+                    current instanceof SideEffectStep ||
+                    current instanceof SelectStep ||
+                    current instanceof SelectOneStep ||
+                    current instanceof Barrier)) {
+                return false;
+            }
+            current = current.getPreviousStep();
+        }
+        return false;
+    }
+
     public static SparkSingleIterationStrategy instance() {
         return INSTANCE;
     }
 
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
index 4e43438..20596d7 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
@@ -32,7 +33,9 @@ import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Column;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.junit.Test;
 
@@ -60,29 +63,93 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
         configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
 
+        /////////// WITHOUT SINGLE-ITERATION STRATEGY LESS SINGLE-PASS OPTIONS ARE AVAILABLE
+
         Graph graph = GraphFactory.open(configuration);
-        GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class);
+        GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class, SingleIterationStrategy.class);
         assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
-        assertFalse(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
+        assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent());
+        assertFalse(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
+        assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
         assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
-        assertTrue(g.V().count().explain().toString().contains(SparkSingleIterationStrategy.class.getSimpleName()));
+        assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent());
 
         test(true, g.V().limit(10));
         test(true, g.V().values("age").groupCount());
         test(true, g.V().groupCount().by(__.out().count()));
         test(true, g.V().outE());
-        test(true, 6l, g.V().count());
-        test(true, 6l, g.V().out().count());
-        test(true, 6l, g.V().local(__.inE()).count());
-        test(true, 6l, g.V().outE().inV().count());
+        test(true, 6L, g.V().count());
+        test(true, 6L, g.V().out().count());
+        test(true, 6L, g.V().outE().inV().count());
         ////
+        test(false, 6L, g.V().local(__.inE()).count());
         test(false, g.V().outE().inV());
         test(false, g.V().both());
-        test(false, 12l, g.V().both().count());
+        test(false, 12L, g.V().both().count());
         test(false, g.V().out().id());
-        test(false, 2l, g.V().out().out().count());
-        test(false, 6l, g.V().in().count());
-        test(false, 6l, g.V().inE().count());
+        test(false, 2L, g.V().out().out().count());
+        test(false, 6L, g.V().in().count());
+        test(false, 6L, g.V().inE().count());
+
+        /////////// WITH SINGLE-ITERATION STRATEGY MORE SINGLE-PASS OPTIONS ARE AVAILABLE
+
+        graph = GraphFactory.open(configuration);
+        g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(SingleIterationStrategy.instance());
+        assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
+        assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent());
+        assertTrue(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
+        assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
+        assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
+        assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent());
+
+        test(true, g.V().limit(10));
+        test(true, g.V().values("age").groupCount());
+        test(true, g.V().groupCount().by(__.out().count()));
+        test(true, g.V().outE());
+        test(true, 6L, g.V().outE().values("weight").count());
+        test(true, 6L, g.V().inE().values("weight").count());
+        test(true, 12L, g.V().bothE().values("weight").count());
+        test(true, g.V().bothE().values("weight"));
+        test(true, g.V().bothE().values("weight").limit(2));
+        test(true, 6L, g.V().count());
+        test(true, 6L, g.V().id().count());
+        test(true, 6L, g.V().out().count());
+        test(true, 6L, g.V().outE().inV().count());
+        test(true, 6L, g.V().outE().inV().id().count());
+        test(true, 2L, g.V().outE().inV().id().groupCount().select(Column.values).unfold().dedup().count());
+        test(true, g.V().out().id());
+        test(true, 6L, g.V().outE().valueMap().count());
+        test(true, g.V().outE().valueMap());
+        test(true, 6L, g.V().inE().valueMap().count());
+        test(true, g.V().inE().valueMap());
+        test(true, 12L, g.V().bothE().valueMap().count());
+        test(true, g.V().bothE().valueMap());
+        test(true, 6L, g.V().inE().id().count());
+        test(true, 6L, g.V().outE().count());
+        test(true, 4L, g.V().outE().inV().id().dedup().count());
+        test(true, 6L, g.V().as("a").outE().inV().as("b").id().dedup("a", "b").by(T.id).count());
+        test(true, 4L, g.V().filter(__.in()).count());
+        test(true, 6L, g.V().sideEffect(__.in()).count());
+        /////
+        test(false, 6L, g.V().local(__.inE()).count());
+        test(false, 6L, g.V().outE().outV().count()); // todo: low probability traversal, but none the less could be optimized for
+        test(false, 6L, g.V().in().count());
+        test(false, 6L, g.V().flatMap(__.in()).count());
+        test(false, 4L, g.V().map(__.in()).count());
+        test(false, 6L, g.V().local(__.in()).count());
+        test(false, 6L, g.V().inE().count());
+        test(false, g.V().outE().inV());
+        test(false, g.V().both());
+        test(false, 12L, g.V().both().count());
+        test(false, g.V().outE().inV().dedup());
+        test(false, 4L, g.V().outE().inV().dedup().count());
+        test(false, 2L, g.V().out().out().count());
+        test(false, 6L, g.V().as("a").map(__.both()).select("a").count());
+        test(false, g.V().out().values("name"));
+        test(false, g.V().out().properties("name"));
+        test(false, g.V().out().valueMap());
+        test(false, 6L, g.V().as("a").outE().inV().values("name").as("b").dedup("a", "b").count());
+        test(false, 2L, g.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count());
     }
 
     private static <R> void test(boolean singleIteration, final Traversal<?, R> traversal) {