You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/06 16:09:05 UTC
[11/18] incubator-tinkerpop git commit: merged master into this
branch. had to fix up SparkGraphComputer conflicts cause of ThreadInterrupt
work from @spmallette. Added some Scope-based recurssive methods to
TraversalHelper. SparkCountItercepter is now s
merged master into this branch. had to fix up SparkGraphComputer conflicts cause of ThreadInterrupt work from @spmallette. Added some Scope-based recurssive methods to TraversalHelper. SparkCountItercepter is now smart about sideEffects. Added more tests to ensure proper interception.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/01fda967
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/01fda967
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/01fda967
Branch: refs/heads/master
Commit: 01fda96707335f2e33b073cef980c7844ca6d80c
Parents: 71f66c5 5fafb0b
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 13:48:39 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 13:48:39 2016 -0600
----------------------------------------------------------------------
CHANGELOG.asciidoc | 3 +
docker/scripts/build.sh | 1 +
.../upgrade/release-3.2.x-incubating.asciidoc | 47 +++++++-
.../benchmark/util/AbstractBenchmarkBase.java | 2 +-
.../traversal/step/map/VertexProgramStep.java | 20 +++-
.../gremlin/process/remote/RemoteGraph.java | 8 ++
.../traversal/step/util/AbstractStep.java | 3 +
.../process/traversal/util/TraversalHelper.java | 73 +++++++++---
.../util/TraversalInterruptedException.java | 27 +++++
.../traversal/util/DefaultTraversalTest.java | 40 +++++++
gremlin-driver/pom.xml | 1 -
.../process/AbstractGremlinProcessTest.java | 8 +-
.../gremlin/process/ProcessComputerSuite.java | 4 +
.../gremlin/process/ProcessStandardSuite.java | 2 +
.../TraversalInterruptionComputerTest.java | 97 ++++++++++++++++
.../traversal/TraversalInterruptionTest.java | 114 +++++++++++++++++++
.../gremlin/hadoop/structure/HadoopGraph.java | 9 ++
pom.xml | 5 +
.../process/computer/SparkGraphComputer.java | 24 +++-
.../interceptor/SparkCountInterceptor.java | 8 +-
.../SparkInterceptorStrategyTest.java | 4 +
tinkergraph-gremlin/pom.xml | 4 +
.../process/computer/TinkerGraphComputer.java | 30 ++++-
.../process/computer/TinkerWorkerPool.java | 12 +-
24 files changed, 513 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/01fda967/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
----------------------------------------------------------------------
diff --cc gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
index 98b337c,fe76381..09ea337
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
@@@ -19,10 -19,8 +19,11 @@@
package org.apache.tinkerpop.gremlin.process.traversal.util;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
++import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.ElementValueTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.TokenTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.HasContainerHolder;
import org.apache.tinkerpop.gremlin.process.traversal.step.Scoping;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
@@@ -228,16 -222,16 +229,24 @@@ public final class TraversalHelper
}
public static <S> List<S> getStepsOfAssignableClassRecursively(final Class<S> stepClass, final Traversal.Admin<?, ?> traversal) {
++ return getStepsOfAssignableClassRecursively(null, stepClass, traversal);
++ }
++
++ public static <S> List<S> getStepsOfAssignableClassRecursively(final Scope scope, final Class<S> stepClass, final Traversal.Admin<?, ?> traversal) {
final List<S> list = new ArrayList<>();
for (final Step<?, ?> step : traversal.getSteps()) {
if (stepClass.isAssignableFrom(step.getClass()))
list.add((S) step);
if (step instanceof TraversalParent) {
-- for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
-- list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(stepClass, localChild));
++ if (null == scope || Scope.local.equals(scope)) {
++ for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
++ list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(stepClass, localChild));
++ }
}
-- for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
-- list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(stepClass, globalChild));
++ if (null == scope || Scope.global.equals(scope)) {
++ for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
++ list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(stepClass, globalChild));
++ }
}
}
}
@@@ -287,7 -281,7 +296,7 @@@
/**
* Determine if the traversal has a step of an assignable class in the current {@link Traversal} and its
-- * child traversals.
++ * local and global child traversals.
*
* @param stepClass the step class to look for
* @param traversal the traversal in which to look for the given step class
@@@ -295,16 -289,16 +304,34 @@@
* given <code>stepClass</code>, otherwise <code>false</code>.
*/
public static boolean hasStepOfAssignableClassRecursively(final Class stepClass, final Traversal.Admin<?, ?> traversal) {
++ return hasStepOfAssignableClassRecursively(null, stepClass, traversal);
++ }
++
++ /**
++ * Determine if the traversal has a step of an assignable class in the current {@link Traversal} and its
++ * {@link Scope} child traversals.
++ *
++ * @param scope the child traversal scope to check
++ * @param stepClass the step class to look for
++ * @param traversal the traversal in which to look for the given step class
++ * @return <code>true</code> if any step in the given traversal (and its child traversals) is an instance of the
++ * given <code>stepClass</code>, otherwise <code>false</code>.
++ */
++ public static boolean hasStepOfAssignableClassRecursively(final Scope scope, final Class stepClass, final Traversal.Admin<?, ?> traversal) {
for (final Step<?, ?> step : traversal.getSteps()) {
if (stepClass.isAssignableFrom(step.getClass())) {
return true;
}
if (step instanceof TraversalParent) {
-- for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
-- if (hasStepOfAssignableClassRecursively(stepClass, localChild)) return true;
++ if (null == scope || Scope.local.equals(scope)) {
++ for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
++ if (hasStepOfAssignableClassRecursively(stepClass, localChild)) return true;
++ }
}
-- for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
-- if (hasStepOfAssignableClassRecursively(stepClass, globalChild)) return true;
++ if (null == scope || Scope.global.equals(scope)) {
++ for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
++ if (hasStepOfAssignableClassRecursively(stepClass, globalChild)) return true;
++ }
}
}
}
@@@ -313,7 -307,7 +340,7 @@@
/**
* Determine if the traversal has any of the supplied steps of an assignable class in the current {@link Traversal}
-- * and its child traversals.
++ * and its global or local child traversals.
*
* @param stepClasses the step classes to look for
* @param traversal the traversal in which to look for the given step classes
@@@ -321,6 -315,6 +348,20 @@@
* provided in <code>stepClasses</code>, otherwise <code>false</code>.
*/
public static boolean hasStepOfAssignableClassRecursively(final Collection<Class> stepClasses, final Traversal.Admin<?, ?> traversal) {
++ return hasStepOfAssignableClassRecursively(null, stepClasses, traversal);
++ }
++
++ /**
++ * Determine if the traversal has any of the supplied steps of an assignable class in the current {@link Traversal}
++ * and its {@link Scope} child traversals.
++ *
++ * @param scope whether to check global or local children (null for both).
++ * @param stepClasses the step classes to look for
++ * @param traversal the traversal in which to look for the given step classes
++ * @return <code>true</code> if any step in the given traversal (and its child traversals) is an instance of a class
++ * provided in <code>stepClasses</code>, otherwise <code>false</code>.
++ */
++ public static boolean hasStepOfAssignableClassRecursively(final Scope scope, final Collection<Class> stepClasses, final Traversal.Admin<?, ?> traversal) {
if (stepClasses.size() == 1)
return hasStepOfAssignableClassRecursively(stepClasses.iterator().next(), traversal);
for (final Step<?, ?> step : traversal.getSteps()) {
@@@ -328,11 -322,11 +369,15 @@@
return true;
}
if (step instanceof TraversalParent) {
-- for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
-- if (hasStepOfAssignableClassRecursively(stepClasses, localChild)) return true;
++ if (null == scope || Scope.local.equals(scope)) {
++ for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
++ if (hasStepOfAssignableClassRecursively(stepClasses, localChild)) return true;
++ }
}
-- for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
-- if (hasStepOfAssignableClassRecursively(stepClasses, globalChild)) return true;
++ if (null == scope || Scope.global.equals(scope)) {
++ for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
++ if (hasStepOfAssignableClassRecursively(stepClasses, globalChild)) return true;
++ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/01fda967/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --cc spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index b7ec133,bc8bc50..82b3369
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@@ -51,11 -52,8 +52,12 @@@ import org.apache.tinkerpop.gremlin.pro
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkInterceptorStrategy;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkPartitionAwareStrategy;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
@@@ -71,9 -69,11 +73,12 @@@ import org.apache.tinkerpop.gremlin.str
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+ import java.util.concurrent.ThreadFactory;
import java.util.stream.Stream;
/**
@@@ -81,15 -81,17 +86,22 @@@
*/
public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
-
+ private final org.apache.commons.configuration.Configuration sparkConfiguration;
+ private boolean workersSet = false;
+ private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SparkGraphComputer.class.getSimpleName() + "-boss").build();
+
+ /**
+ * An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
+ * for a {@link VertexProgram} a single threaded executor is sufficient.
+ */
+ private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);
- private final org.apache.commons.configuration.Configuration sparkConfiguration;
- private boolean workersSet = false;
+ static {
+ TraversalStrategies.GlobalCache.registerStrategies(SparkGraphComputer.class,
+ TraversalStrategies.GlobalCache.getStrategies(GraphComputer.class).clone().addStrategies(
+ SparkPartitionAwareStrategy.instance(),
+ SparkInterceptorStrategy.instance()));
+ }
public SparkGraphComputer(final HadoopGraph hadoopGraph) {
super(hadoopGraph);
@@@ -120,8 -122,8 +132,8 @@@
}
private Future<ComputerResult> submitWithExecutor(Executor exec) {
-- // create the completable future �
- return CompletableFuture.<ComputerResult>supplyAsync(() -> {
++ // create the completable future
+ return computerService.submit(() -> {
final long startTime = System.currentTimeMillis();
// apache and hadoop configurations that are used throughout the graph computer computation
final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
@@@ -240,43 -236,34 +252,47 @@@
// process the vertex program //
////////////////////////////////
if (null != this.vertexProgram) {
- // set up the vertex program and wire up configurations
- JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
- this.vertexProgram.setup(memory);
- memory.broadcastMemory(sparkContext);
- final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
- this.vertexProgram.storeState(vertexProgramConfiguration);
- ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
- ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
- // execute the vertex program
- while (true) {
- if (Thread.interrupted()) {
- sparkContext.cancelAllJobs();
- throw new TraversalInterruptedException();
+ /////////////////
+ // if there is a registered VertexProgramInterceptor, use it to bypass the GraphComputer semantics
+ if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
+ try {
+ final SparkVertexProgramInterceptor<VertexProgram> interceptor =
+ (SparkVertexProgramInterceptor) Class.forName(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
+ computedGraphRDD = interceptor.apply(this.vertexProgram, loadedGraphRDD, memory);
+ } catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ throw new IllegalStateException(e.getMessage());
}
- memory.setInExecute(true);
- viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
- memory.setInExecute(false);
- if (this.vertexProgram.terminate(memory))
- break;
- else {
- memory.incrIteration();
- memory.broadcastMemory(sparkContext);
+ } else { // standard GraphComputer semantics
+ // set up the vertex program and wire up configurations
+ this.vertexProgram.setup(memory);
+ JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
+ memory.broadcastMemory(sparkContext);
+ final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
+ this.vertexProgram.storeState(vertexProgramConfiguration);
+ ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+ ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
+ // execute the vertex program
+ while (true) {
++ if (Thread.interrupted()) {
++ sparkContext.cancelAllJobs();
++ throw new TraversalInterruptedException();
++ }
+ memory.setInExecute(true);
+ viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+ memory.setInExecute(false);
+ if (this.vertexProgram.terminate(memory))
+ break;
+ else {
+ memory.incrIteration();
+ memory.broadcastMemory(sparkContext);
+ }
}
+ // write the computed graph to the respective output (rdd or output format)
+ computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
}
+ /////////////////
memory.complete(); // drop all transient memory keys
- // write the computed graph to the respective output (rdd or output format)
- computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
if (null != outputRDD && !this.persist.equals(Persist.NOTHING)) {
outputRDD.writeGraphRDD(apacheConfiguration, computedGraphRDD);
}
@@@ -405,4 -391,4 +421,4 @@@
final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
}
--}
++}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/01fda967/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
----------------------------------------------------------------------
diff --cc spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
index e737108,0000000..f85d6db
mode 100644,000000..100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
@@@ -1,97 -1,0 +1,103 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
++import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
++import org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
++import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
++import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkCountInterceptor implements SparkVertexProgramInterceptor<TraversalVertexProgram> {
+
+ public SparkCountInterceptor() {
+
+ }
+
+ @Override
+ public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
+ vertexProgram.setup(memory);
+ final Traversal.Admin<Vertex, Long> traversal = (Traversal.Admin) vertexProgram.getTraversal().getPure().clone();
+ final Object[] graphStepIds = ((GraphStep) traversal.getStartStep()).getIds(); // any V(1,2,3)-style ids to filter on
+ final CountGlobalStep countGlobalStep = (CountGlobalStep) traversal.getEndStep(); // needed for the final traverser generation
+ traversal.removeStep(0); // remove GraphStep
+ traversal.removeStep(traversal.getSteps().size() - 1); // remove CountGlobalStep
+ traversal.applyStrategies(); // compile
+ boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast)
+
+ ((MemoryTraversalSideEffects) traversal.getSideEffects()).setMemory(memory, true); // any intermediate sideEffect steps are backed by SparkMemory
+ memory.setInExecute(true);
+ final long count = inputRDD
+ .filter(tuple -> ElementHelper.idExists(tuple._2().get().id(), graphStepIds))
+ .flatMapValues(vertexWritable -> {
+ if (identityTraversal) // g.V.count()-style (identity)
+ return () -> (Iterator) IteratorUtils.of(vertexWritable);
+ else { // add the vertex to head of the traversal
+ return () -> { // and iterate it for its results
- final Traversal.Admin<Vertex, ?> clone = traversal.clone();
++ final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation
+ clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), EmptyStep.instance(), 1l));
+ return clone;
+ };
+ }
+ }).count();
+ memory.setInExecute(false);
+
+ // generate the HALTED_TRAVERSERS for the memory
+ final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
+ haltedTraversers.add(traversal.getTraverserGenerator().generate(count, countGlobalStep, 1l));
+ memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
+ memory.incrIteration(); // any local star graph reduction take a single iteration
+ return inputRDD;
+ }
+
+ public static boolean isLegal(final Traversal.Admin<?, ?> traversal) {
+ final List<Step> steps = traversal.getSteps();
+ if (!steps.get(0).getClass().equals(GraphStep.class) || ((GraphStep) steps.get(0)).returnsEdge())
+ return false;
+ if (!steps.get(steps.size() - 1).getClass().equals(CountGlobalStep.class))
+ return false;
++ if (TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, Arrays.asList(CollectingBarrierStep.class, DedupGlobalStep.class), traversal))
++ return false;
+ return TraversalHelper.isLocalStarGraph(traversal);
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/01fda967/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --cc spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
index c89aed8,0000000..4529fd3
mode 100644,000000..100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
@@@ -1,141 -1,0 +1,145 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.VertexProgramInterceptor;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
++import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkCountInterceptor;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkInterceptorStrategyTest extends AbstractSparkTest {
+
+ @Test
+ public void shouldHandleSideEffectsCorrectly() throws Exception {
+ final Configuration configuration = getBaseConfiguration();
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString()));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ ///
+ Graph graph = GraphFactory.open(configuration);
+ GraphTraversalSource g = graph.traversal().withComputer();
+ assertTrue(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
+ assertTrue(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
+ /// groupCount(m)-test
+ Traversal.Admin<Vertex, Long> traversal = g.V().groupCount("m").by(T.label).count().asAdmin();
+ test(SparkCountInterceptor.class, 6l, traversal);
+ assertEquals(1, traversal.getSideEffects().keys().size());
+ assertTrue(traversal.getSideEffects().exists("m"));
+ assertTrue(traversal.getSideEffects().keys().contains("m"));
+ final Map<String, Long> map = traversal.getSideEffects().get("m");
+ assertEquals(2, map.size());
+ assertEquals(2, map.get("software").intValue());
+ assertEquals(4, map.get("person").intValue());
+ }
+
+ @Test
+ public void shouldSuccessfullyEvaluateInterceptedTraversals() throws Exception {
+ final Configuration configuration = getBaseConfiguration();
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString()));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ ///
+ Graph graph = GraphFactory.open(configuration);
+ GraphTraversalSource g = graph.traversal().withComputer();
+ assertTrue(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
+ assertTrue(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
+ /// SparkCountInterceptor matches
+ test(SparkCountInterceptor.class, 6l, g.V().count());
+ test(SparkCountInterceptor.class, 2l, g.V().hasLabel("software").count());
+ test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).count());
+ test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).values("name").count());
+ test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).properties("name").count());
+ test(SparkCountInterceptor.class, 4l, g.V().hasLabel("person").has("age", P.gt(30)).properties("name", "age").count());
+ test(SparkCountInterceptor.class, 3l, g.V().hasLabel("person").has("age", P.gt(30)).out().count());
+ test(SparkCountInterceptor.class, 0l, g.V().hasLabel("person").has("age", P.gt(30)).out("knows").count());
+ test(SparkCountInterceptor.class, 3l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age", P.gt(30)).out("created").count());
+ test(SparkCountInterceptor.class, 3l, g.V(1).out().count());
+ test(SparkCountInterceptor.class, 2l, g.V(1).out("knows").count());
+ test(SparkCountInterceptor.class, 3l, g.V(1).out("knows", "created").count());
+ test(SparkCountInterceptor.class, 5l, g.V(1, 4).out("knows", "created").count());
+ test(SparkCountInterceptor.class, 1l, g.V(2).in("knows").count());
+ test(SparkCountInterceptor.class, 0l, g.V(6).has("name", "peter").in().count());
+ test(SparkCountInterceptor.class, 6l, g.V().as("a").values("name").as("b").count());
+ test(SparkCountInterceptor.class, 6l, g.V().as("a").count());
+ test(SparkCountInterceptor.class, 1l, g.V().has("name", "marko").as("a").values("name").as("b").count());
+ test(SparkCountInterceptor.class, 4l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age").out("created").count());
+ /// No interceptor matches
+ test(2l, g.V().out().out().count());
+ test(6l, g.E().count());
+ test(2l, g.V().out().out().count());
+ test(6l, g.V().out().values("name").count());
+ test(2l, g.V().out("knows").values("name").count());
+ test(3l, g.V().in().has("name", "marko").count());
++ test(6l, g.V().repeat(__.dedup()).times(2).count());
++ test(6l, g.V().dedup().count());
++ test(4l, g.V().hasLabel("person").order().by("age").count());
+ }
+
+ private static <R> void test(Class<? extends VertexProgramInterceptor> expectedInterceptor, R expectedResult, final Traversal<?, R> traversal) throws Exception {
+ final Traversal.Admin<?, ?> clone = traversal.asAdmin().clone();
+ clone.applyStrategies();
+ final String interceptor = (String) TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, clone).get()
+ .getComputer()
+ .getConfiguration()
+ .getOrDefault(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, null);
+ if (null == expectedInterceptor)
+ assertNull(interceptor);
+ else
+ assertEquals(expectedInterceptor, Class.forName(interceptor));
+ assertEquals(expectedResult, traversal.next());
+ }
+
+ private static <R> void test(R expectedResult, final Traversal<?, R> traversal) throws Exception {
+ test(null, expectedResult, traversal);
+ }
+}