You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by rd...@apache.org on 2017/01/27 21:30:11 UTC
[20/29] tinkerpop git commit: Added SingleIterationStrategy which is
able to rewrite a set of traversals to not use message passing in OLAP. This
is signficant for all GraphComputers as message passing is expense and
furthermore, for SparkGraphComputer a
Added SingleIterationStrategy which is able to rewrite a set of traversals to not use message passing in OLAP. This is signficant for all GraphComputers as message passing is expense and furthermore, for SparkGraphComputer as without message-passing, there is no need to cache or partition the graph once loaded.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f6b66977
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f6b66977
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f6b66977
Branch: refs/heads/TINKERPOP-1602
Commit: f6b669778ec058a555623c6119e0feaaac59c8be
Parents: f0875d7
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 26 12:57:58 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 27 14:24:18 2017 -0700
----------------------------------------------------------------------
CHANGELOG.asciidoc | 3 +-
.../step/map/TraversalVertexProgramStep.java | 5 +
.../optimization/SingleIterationStrategy.java | 132 +++++++++++++++++++
.../process/traversal/TraversalStrategies.java | 2 +
.../SingleIterationStrategyTest.java | 100 ++++++++++++++
.../SparkSingleIterationStrategy.java | 48 ++++++-
.../SparkSingleIterationStrategyTest.java | 89 +++++++++++--
7 files changed, 362 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 95cfb71..860d401 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,7 +26,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-* Fixed a bug where `keepLabels` were not being pushed down into child traversals by `PathRetractionStrategy`.
+* Fixed a bug where `PathProcessor.keepLabels` were not being pushed down into child traversals by `PathRetractionStrategy`.
+* Added `SingleIterationStrategy` as a default `GraphComputer` strategy that can re-write some traversals to not require message passing.
* Fixed a bug associated with user-provided maps and `GroupSideEffectStep`.
* `GroupBiOperator` no longer maintains a detached traversal and thus, no more side-effect related OLAP inconsistencies.
* Added `ProjectedTraverser` which wraps a traverser with a `List<Object>` of projected data.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
index cb7db29..e866ce2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/TraversalVertexProgramStep.java
@@ -54,6 +54,11 @@ public final class TraversalVertexProgramStep extends VertexProgramStep implemen
return Collections.singletonList(this.computerTraversal.get());
}
+ public void setComputerTraversal(final Traversal.Admin<?,?> computerTraversal) {
+ this.computerTraversal = new PureTraversal<>(computerTraversal);
+ this.integrateChild(this.computerTraversal.get());
+ }
+
@Override
public String toString() {
return StringFactory.stepString(this, this.computerTraversal.get(), new GraphFilter(this.computer));
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
new file mode 100644
index 0000000..efcbe9a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SingleIterationStrategy extends AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements TraversalStrategy.OptimizationStrategy {
+
+ private static final SingleIterationStrategy INSTANCE = new SingleIterationStrategy();
+
+ private static final Set<Class<? extends OptimizationStrategy>> PRIORS = new HashSet<>(Arrays.asList(
+ IncidentToAdjacentStrategy.class,
+ AdjacentToIncidentStrategy.class,
+ FilterRankingStrategy.class,
+ InlineFilterStrategy.class));
+
+ private static final Set<Class> MULTI_ITERATION_CLASSES = new HashSet<>(Arrays.asList(
+ EdgeVertexStep.class,
+ LambdaFlatMapStep.class,
+ LambdaMapStep.class
+ ));
+
+
+ private SingleIterationStrategy() {
+ }
+
+ @Override
+ public void apply(final Traversal.Admin<?, ?> traversal) {
+ // only process the first traversal step in an OLAP chain
+ TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal).ifPresent(step -> {
+ final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined
+ final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone();
+ // does the traversal as it is message pass?
+ boolean doesMessagePass = TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, MULTI_ITERATION_CLASSES, computerTraversal);
+ if (!doesMessagePass) {
+ for (final VertexStep vertexStep : TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, VertexStep.class, computerTraversal)) {
+ if (vertexStep.returnsVertex() || !vertexStep.getDirection().equals(Direction.OUT)) { // in-edges require message pass in OLAP
+ doesMessagePass = true;
+ break;
+ }
+ }
+ } // if the traversal doesn't message pass, then don't try and localize it as its just wasted computation
+ if (doesMessagePass) {
+ final boolean beyondStarGraph =
+ TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, LambdaHolder.class, computerTraversal) ||
+ !TraversalHelper.isLocalStarGraph(computerTraversal);
+ if (!beyondStarGraph && // if we move beyond the star graph, then localization is not possible.
+ !(computerTraversal.getStartStep().getNextStep() instanceof EmptyStep) && // if its just a g.V()/E(), then don't localize
+ !(computerTraversal.getStartStep().getNextStep() instanceof LocalStep) && // removes the potential for the infinite recursive application of the traversal
+ !(computerTraversal.getStartStep().getNextStep() instanceof Barrier) && // if the second step is a barrier, no point in trying to localize anything
+ !(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, computerTraversal). // this is a strict precaution that could be loosed with deeper logic on barriers in global children
+ stream().
+ filter(parent -> !parent.getGlobalChildren().isEmpty()).findAny().isPresent())) {
+
+ final Traversal.Admin<?, ?> newComputerTraversal = step.computerTraversal.getPure();
+ final Traversal.Admin localTraversal = new DefaultGraphTraversal<>();
+ final Step barrier = (Step) TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, newComputerTraversal).orElse(null);
+ final Step endStep = null == barrier ? newComputerTraversal.getEndStep() : barrier.getPreviousStep();
+ if (!(endStep instanceof VertexStep || endStep instanceof EdgeVertexStep)) {
+ TraversalHelper.removeToTraversal(newComputerTraversal.getStartStep().getNextStep(), null == barrier ? EmptyStep.instance() : barrier, localTraversal);
+ assert !localTraversal.getSteps().isEmpty(); // given the if() constraints, this is impossible
+ if (localTraversal.getSteps().size() > 1) { // if its just a single step, a local wrap will not alter its locus of computation
+ if (null == barrier)
+ TraversalHelper.insertTraversal(0, (Traversal.Admin) __.local(localTraversal), newComputerTraversal);
+ else
+ TraversalHelper.insertTraversal(barrier.getPreviousStep(), (Traversal.Admin) __.local(localTraversal), newComputerTraversal);
+ step.setComputerTraversal(newComputerTraversal);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public Set<Class<? extends OptimizationStrategy>> applyPrior() {
+ return PRIORS;
+ }
+
+ public static SingleIterationStrategy instance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
index 015df70..a1cb99c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.process.traversal;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.ProfileStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
@@ -222,6 +223,7 @@ public interface TraversalStrategies extends Serializable, Cloneable {
final TraversalStrategies graphComputerStrategies = new DefaultTraversalStrategies();
graphComputerStrategies.addStrategies(
GraphFilterStrategy.instance(),
+ SingleIterationStrategy.instance(),
OrderLimitStrategy.instance(),
PathProcessorStrategy.instance(),
ComputerVerificationStrategy.instance());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
new file mode 100644
index 0000000..612fb9d
--- /dev/null
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization;
+
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(Parameterized.class)
+public class SingleIterationStrategyTest {
+
+ @Parameterized.Parameter(value = 0)
+ public Traversal original;
+
+ @Parameterized.Parameter(value = 1)
+ public Traversal optimized;
+
+ @Parameterized.Parameter(value = 2)
+ public Collection<TraversalStrategy> otherStrategies;
+
+ @Test
+ public void doTest() {
+ final Traversal.Admin<?, ?> rootTraversal = new DefaultGraphTraversal<>();
+ final TraversalVertexProgramStep parent = new TraversalVertexProgramStep(rootTraversal, this.original.asAdmin());
+ rootTraversal.addStep(parent.asStep());
+ parent.setComputerTraversal(this.original.asAdmin());
+ final TraversalStrategies strategies = new DefaultTraversalStrategies();
+ strategies.addStrategies(SingleIterationStrategy.instance());
+ for (final TraversalStrategy strategy : this.otherStrategies) {
+ strategies.addStrategies(strategy);
+ }
+ rootTraversal.setStrategies(strategies);
+ rootTraversal.asAdmin().applyStrategies();
+ assertEquals(this.optimized, parent.computerTraversal.get());
+ }
+
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Iterable<Object[]> generateTestParameters() {
+
+ return Arrays.asList(new Object[][]{
+ {__.V().out().count(), __.V().outE().count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())},
+ {__.V().id(), __.V().id(), Collections.emptyList()},
+ {__.V().out().count(), __.V().out().count(), Collections.emptyList()},
+ {__.V().out().label().count(), __.V().out().label().count(), Collections.emptyList()},
+ {__.V().in().id(), __.V().local(__.in().id()), Collections.emptyList()},
+ {__.V().out().id(), __.V().local(__.out().id()), Collections.emptyList()},
+ {__.V().both().id(), __.V().local(__.both().id()), Collections.emptyList()},
+ {__.V().outE().inV().id().count(), __.V().local(__.outE().inV().id()).count(), Collections.emptyList()},
+ {__.V().map(__.outE().inV()).count(), __.V().map(__.outE().inV()).count(), Collections.emptyList()},
+ {__.V().out().map(__.outE().inV()).count(), __.V().out().map(__.outE().inV()).count(), Collections.emptyList()},
+ {__.V().outE().map(__.inV()).id().count(), __.V().outE().map(__.inV()).id().count(), Collections.emptyList()},
+ {__.V().outE().map(__.inV()).count(), __.V().outE().map(__.inV()).count(), Collections.emptyList()},
+ {__.V().outE().map(__.inV()).values("name").count(), __.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()},
+ {__.V().outE().inV().count(), __.V().outE().inV().count(), Collections.emptyList()},
+ {__.V().out().id().count(), __.V().local(__.out().id()).count(), Collections.emptyList()},
+ {__.V().in().id().count(), __.V().local(__.in().id()).count(), Collections.emptyList()},
+ {__.V().in().id().select("id-map").dedup().count(), __.V().local(__.in().id().select("id-map")).dedup().count(), Collections.emptyList()},
+ {__.V().outE().values("weight"), __.V().outE().values("weight"), Collections.emptyList()},
+ {__.V().outE().values("weight").sum(), __.V().outE().values("weight").sum(), Collections.emptyList()},
+ {__.V().inE().values("weight"), __.V().local(__.inE().values("weight")), Collections.emptyList()},
+ {__.V().inE().values("weight").sum(), __.V().local(__.inE().values("weight")).sum(), Collections.emptyList()},
+ {__.V().inE().values("weight").sum().dedup().count(), __.V().local(__.inE().values("weight")).sum().dedup().count(), Collections.emptyList()},
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
index a4acf4c..2abb9b8 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java
@@ -23,12 +23,23 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory;
import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectOneStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.structure.Direction;
@@ -48,9 +59,8 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
private static final Set<Class> MULTI_ITERATION_CLASSES = new HashSet<>(Arrays.asList(
EdgeVertexStep.class,
- LambdaMapStep.class, // maybe?
- LambdaFlatMapStep.class // maybe?
- // VertexStep is special as you need to see if the return class is Edge or Vertex (logic below)
+ LambdaMapStep.class,
+ LambdaFlatMapStep.class
));
private SparkSingleIterationStrategy() {
@@ -63,6 +73,7 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone();
if (!computerTraversal.isLocked())
computerTraversal.applyStrategies();
+ ///
boolean doesMessagePass = TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, MULTI_ITERATION_CLASSES, computerTraversal);
if (!doesMessagePass) {
for (final VertexStep vertexStep : TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, VertexStep.class, computerTraversal)) {
@@ -72,18 +83,45 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg
}
}
}
- if (!doesMessagePass) {
+ if (!doesMessagePass && !SparkSingleIterationStrategy.endsWithInElement(computerTraversal)) {
step.setComputer(step.getComputer()
// if no message passing, don't partition the loaded graph
.configure(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, true)
- // if no message passing, don't cache the loaded graph
+ // if no message passing, don't cache the loaded graph
.configure(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, true));
}
}
}
+ private static final boolean endsWithInElement(final Traversal.Admin<?, ?> traversal) {
+ Step<?, ?> current = traversal.getEndStep();
+ while (!(current instanceof EmptyStep)) {
+ if ((current instanceof VertexStep && (((VertexStep) current).returnsVertex() ||
+ !((VertexStep) current).getDirection().equals(Direction.OUT))) ||
+ current instanceof EdgeVertexStep) {
+ return true;
+ } else if (current instanceof TraversalMapStep || current instanceof TraversalFlatMapStep || current instanceof LocalStep) {
+ if (endsWithInElement(((TraversalParent) current).getLocalChildren().get(0)))
+ return true;
+ } else if (current instanceof TraversalParent) {
+ if (((TraversalParent) current).getGlobalChildren().stream().filter(SparkSingleIterationStrategy::endsWithInElement).findAny().isPresent())
+ return true;
+ }
+ if (!(current instanceof FilterStep ||
+ current instanceof SideEffectStep ||
+ current instanceof SelectStep ||
+ current instanceof SelectOneStep ||
+ current instanceof Barrier)) {
+ return false;
+ }
+ current = current.getPreviousStep();
+ }
+ return false;
+ }
+
public static SparkSingleIterationStrategy instance() {
return INSTANCE;
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f6b66977/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
index 4e43438..20596d7 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
@@ -32,7 +33,9 @@ import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;
@@ -60,29 +63,93 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest {
configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ /////////// WITHOUT SINGLE-ITERATION STRATEGY LESS SINGLE-PASS OPTIONS ARE AVAILABLE
+
Graph graph = GraphFactory.open(configuration);
- GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class);
+ GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class, SingleIterationStrategy.class);
assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
- assertFalse(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
+ assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent());
+ assertFalse(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
+ assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
- assertTrue(g.V().count().explain().toString().contains(SparkSingleIterationStrategy.class.getSimpleName()));
+ assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent());
test(true, g.V().limit(10));
test(true, g.V().values("age").groupCount());
test(true, g.V().groupCount().by(__.out().count()));
test(true, g.V().outE());
- test(true, 6l, g.V().count());
- test(true, 6l, g.V().out().count());
- test(true, 6l, g.V().local(__.inE()).count());
- test(true, 6l, g.V().outE().inV().count());
+ test(true, 6L, g.V().count());
+ test(true, 6L, g.V().out().count());
+ test(true, 6L, g.V().outE().inV().count());
////
+ test(false, 6L, g.V().local(__.inE()).count());
test(false, g.V().outE().inV());
test(false, g.V().both());
- test(false, 12l, g.V().both().count());
+ test(false, 12L, g.V().both().count());
test(false, g.V().out().id());
- test(false, 2l, g.V().out().out().count());
- test(false, 6l, g.V().in().count());
- test(false, 6l, g.V().inE().count());
+ test(false, 2L, g.V().out().out().count());
+ test(false, 6L, g.V().in().count());
+ test(false, 6L, g.V().inE().count());
+
+ /////////// WITH SINGLE-ITERATION STRATEGY MORE SINGLE-PASS OPTIONS ARE AVAILABLE
+
+ graph = GraphFactory.open(configuration);
+ g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(SingleIterationStrategy.instance());
+ assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
+ assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent());
+ assertTrue(g.getStrategies().toList().contains(SingleIterationStrategy.instance()));
+ assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent());
+ assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance()));
+ assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent());
+
+ test(true, g.V().limit(10));
+ test(true, g.V().values("age").groupCount());
+ test(true, g.V().groupCount().by(__.out().count()));
+ test(true, g.V().outE());
+ test(true, 6L, g.V().outE().values("weight").count());
+ test(true, 6L, g.V().inE().values("weight").count());
+ test(true, 12L, g.V().bothE().values("weight").count());
+ test(true, g.V().bothE().values("weight"));
+ test(true, g.V().bothE().values("weight").limit(2));
+ test(true, 6L, g.V().count());
+ test(true, 6L, g.V().id().count());
+ test(true, 6L, g.V().out().count());
+ test(true, 6L, g.V().outE().inV().count());
+ test(true, 6L, g.V().outE().inV().id().count());
+ test(true, 2L, g.V().outE().inV().id().groupCount().select(Column.values).unfold().dedup().count());
+ test(true, g.V().out().id());
+ test(true, 6L, g.V().outE().valueMap().count());
+ test(true, g.V().outE().valueMap());
+ test(true, 6L, g.V().inE().valueMap().count());
+ test(true, g.V().inE().valueMap());
+ test(true, 12L, g.V().bothE().valueMap().count());
+ test(true, g.V().bothE().valueMap());
+ test(true, 6L, g.V().inE().id().count());
+ test(true, 6L, g.V().outE().count());
+ test(true, 4L, g.V().outE().inV().id().dedup().count());
+ test(true, 6L, g.V().as("a").outE().inV().as("b").id().dedup("a", "b").by(T.id).count());
+ test(true, 4L, g.V().filter(__.in()).count());
+ test(true, 6L, g.V().sideEffect(__.in()).count());
+ /////
+ test(false, 6L, g.V().local(__.inE()).count());
+ test(false, 6L, g.V().outE().outV().count()); // todo: low probability traversal, but none the less could be optimized for
+ test(false, 6L, g.V().in().count());
+ test(false, 6L, g.V().flatMap(__.in()).count());
+ test(false, 4L, g.V().map(__.in()).count());
+ test(false, 6L, g.V().local(__.in()).count());
+ test(false, 6L, g.V().inE().count());
+ test(false, g.V().outE().inV());
+ test(false, g.V().both());
+ test(false, 12L, g.V().both().count());
+ test(false, g.V().outE().inV().dedup());
+ test(false, 4L, g.V().outE().inV().dedup().count());
+ test(false, 2L, g.V().out().out().count());
+ test(false, 6L, g.V().as("a").map(__.both()).select("a").count());
+ test(false, g.V().out().values("name"));
+ test(false, g.V().out().properties("name"));
+ test(false, g.V().out().valueMap());
+ test(false, 6L, g.V().as("a").outE().inV().values("name").as("b").dedup("a", "b").count());
+ test(false, 2L, g.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count());
}
private static <R> void test(boolean singleIteration, final Traversal<?, R> traversal) {