You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 19:49:00 UTC

[15/15] incubator-tinkerpop git commit: merged master into this branch. had to fix up SparkGraphComputer conflicts cause of ThreadInterrupt work from @spmallette. Added some Scope-based recurssive methods to TraversalHelper. SparkCountItercepter is now s

merged master into this branch. had to fix up SparkGraphComputer conflicts cause of ThreadInterrupt work from @spmallette. Added some Scope-based recurssive methods to TraversalHelper. SparkCountItercepter is now smart about sideEffects. Added more tests to ensure proper interception.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/01fda967
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/01fda967
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/01fda967

Branch: refs/heads/TINKERPOP-1288
Commit: 01fda96707335f2e33b073cef980c7844ca6d80c
Parents: 71f66c5 5fafb0b
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 13:48:39 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 13:48:39 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   3 +
 docker/scripts/build.sh                         |   1 +
 .../upgrade/release-3.2.x-incubating.asciidoc   |  47 +++++++-
 .../benchmark/util/AbstractBenchmarkBase.java   |   2 +-
 .../traversal/step/map/VertexProgramStep.java   |  20 +++-
 .../gremlin/process/remote/RemoteGraph.java     |   8 ++
 .../traversal/step/util/AbstractStep.java       |   3 +
 .../process/traversal/util/TraversalHelper.java |  73 +++++++++---
 .../util/TraversalInterruptedException.java     |  27 +++++
 .../traversal/util/DefaultTraversalTest.java    |  40 +++++++
 gremlin-driver/pom.xml                          |   1 -
 .../process/AbstractGremlinProcessTest.java     |   8 +-
 .../gremlin/process/ProcessComputerSuite.java   |   4 +
 .../gremlin/process/ProcessStandardSuite.java   |   2 +
 .../TraversalInterruptionComputerTest.java      |  97 ++++++++++++++++
 .../traversal/TraversalInterruptionTest.java    | 114 +++++++++++++++++++
 .../gremlin/hadoop/structure/HadoopGraph.java   |   9 ++
 pom.xml                                         |   5 +
 .../process/computer/SparkGraphComputer.java    |  24 +++-
 .../interceptor/SparkCountInterceptor.java      |   8 +-
 .../SparkInterceptorStrategyTest.java           |   4 +
 tinkergraph-gremlin/pom.xml                     |   4 +
 .../process/computer/TinkerGraphComputer.java   |  30 ++++-
 .../process/computer/TinkerWorkerPool.java      |  12 +-
 24 files changed, 513 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/01fda967/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
----------------------------------------------------------------------
diff --cc gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
index 98b337c,fe76381..09ea337
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
@@@ -19,10 -19,8 +19,11 @@@
  package org.apache.tinkerpop.gremlin.process.traversal.util;
  
  import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
++import org.apache.tinkerpop.gremlin.process.traversal.Scope;
  import org.apache.tinkerpop.gremlin.process.traversal.Step;
  import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 +import org.apache.tinkerpop.gremlin.process.traversal.lambda.ElementValueTraversal;
 +import org.apache.tinkerpop.gremlin.process.traversal.lambda.TokenTraversal;
  import org.apache.tinkerpop.gremlin.process.traversal.step.HasContainerHolder;
  import org.apache.tinkerpop.gremlin.process.traversal.step.Scoping;
  import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
@@@ -228,16 -222,16 +229,24 @@@ public final class TraversalHelper 
      }
  
      public static <S> List<S> getStepsOfAssignableClassRecursively(final Class<S> stepClass, final Traversal.Admin<?, ?> traversal) {
++        return getStepsOfAssignableClassRecursively(null, stepClass, traversal);
++    }
++
++    public static <S> List<S> getStepsOfAssignableClassRecursively(final Scope scope, final Class<S> stepClass, final Traversal.Admin<?, ?> traversal) {
          final List<S> list = new ArrayList<>();
          for (final Step<?, ?> step : traversal.getSteps()) {
              if (stepClass.isAssignableFrom(step.getClass()))
                  list.add((S) step);
              if (step instanceof TraversalParent) {
--                for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
--                    list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(stepClass, localChild));
++                if (null == scope || Scope.local.equals(scope)) {
++                    for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
++                        list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(stepClass, localChild));
++                    }
                  }
--                for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
--                    list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(stepClass, globalChild));
++                if (null == scope || Scope.global.equals(scope)) {
++                    for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
++                        list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(stepClass, globalChild));
++                    }
                  }
              }
          }
@@@ -287,7 -281,7 +296,7 @@@
  
      /**
       * Determine if the traversal has a step of an assignable class in the current {@link Traversal} and its
--     * child traversals.
++     * local and global child traversals.
       *
       * @param stepClass the step class to look for
       * @param traversal the traversal in which to look for the given step class
@@@ -295,16 -289,16 +304,34 @@@
       * given <code>stepClass</code>, otherwise <code>false</code>.
       */
      public static boolean hasStepOfAssignableClassRecursively(final Class stepClass, final Traversal.Admin<?, ?> traversal) {
++        return hasStepOfAssignableClassRecursively(null, stepClass, traversal);
++    }
++
++    /**
++     * Determine if the traversal has a step of an assignable class in the current {@link Traversal} and its
++     * {@link Scope} child traversals.
++     *
++     * @param scope     the child traversal scope to check
++     * @param stepClass the step class to look for
++     * @param traversal the traversal in which to look for the given step class
++     * @return <code>true</code> if any step in the given traversal (and its child traversals) is an instance of the
++     * given <code>stepClass</code>, otherwise <code>false</code>.
++     */
++    public static boolean hasStepOfAssignableClassRecursively(final Scope scope, final Class stepClass, final Traversal.Admin<?, ?> traversal) {
          for (final Step<?, ?> step : traversal.getSteps()) {
              if (stepClass.isAssignableFrom(step.getClass())) {
                  return true;
              }
              if (step instanceof TraversalParent) {
--                for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
--                    if (hasStepOfAssignableClassRecursively(stepClass, localChild)) return true;
++                if (null == scope || Scope.local.equals(scope)) {
++                    for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
++                        if (hasStepOfAssignableClassRecursively(stepClass, localChild)) return true;
++                    }
                  }
--                for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
--                    if (hasStepOfAssignableClassRecursively(stepClass, globalChild)) return true;
++                if (null == scope || Scope.global.equals(scope)) {
++                    for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
++                        if (hasStepOfAssignableClassRecursively(stepClass, globalChild)) return true;
++                    }
                  }
              }
          }
@@@ -313,7 -307,7 +340,7 @@@
  
      /**
       * Determine if the traversal has any of the supplied steps of an assignable class in the current {@link Traversal}
--     * and its child traversals.
++     * and its global or local child traversals.
       *
       * @param stepClasses the step classes to look for
       * @param traversal   the traversal in which to look for the given step classes
@@@ -321,6 -315,6 +348,20 @@@
       * provided in <code>stepClasses</code>, otherwise <code>false</code>.
       */
      public static boolean hasStepOfAssignableClassRecursively(final Collection<Class> stepClasses, final Traversal.Admin<?, ?> traversal) {
++        return hasStepOfAssignableClassRecursively(null, stepClasses, traversal);
++    }
++
++    /**
++     * Determine if the traversal has any of the supplied steps of an assignable class in the current {@link Traversal}
++     * and its {@link Scope} child traversals.
++     *
++     * @param scope       whether to check global or local children (null for both).
++     * @param stepClasses the step classes to look for
++     * @param traversal   the traversal in which to look for the given step classes
++     * @return <code>true</code> if any step in the given traversal (and its child traversals) is an instance of a class
++     * provided in <code>stepClasses</code>, otherwise <code>false</code>.
++     */
++    public static boolean hasStepOfAssignableClassRecursively(final Scope scope, final Collection<Class> stepClasses, final Traversal.Admin<?, ?> traversal) {
          if (stepClasses.size() == 1)
              return hasStepOfAssignableClassRecursively(stepClasses.iterator().next(), traversal);
          for (final Step<?, ?> step : traversal.getSteps()) {
@@@ -328,11 -322,11 +369,15 @@@
                  return true;
              }
              if (step instanceof TraversalParent) {
--                for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
--                    if (hasStepOfAssignableClassRecursively(stepClasses, localChild)) return true;
++                if (null == scope || Scope.local.equals(scope)) {
++                    for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
++                        if (hasStepOfAssignableClassRecursively(stepClasses, localChild)) return true;
++                    }
                  }
--                for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
--                    if (hasStepOfAssignableClassRecursively(stepClasses, globalChild)) return true;
++                if (null == scope || Scope.global.equals(scope)) {
++                    for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
++                        if (hasStepOfAssignableClassRecursively(stepClasses, globalChild)) return true;
++                    }
                  }
              }
          }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/01fda967/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --cc spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index b7ec133,bc8bc50..82b3369
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@@ -51,11 -52,8 +52,12 @@@ import org.apache.tinkerpop.gremlin.pro
  import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
  import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
  import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
  import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkInterceptorStrategy;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkPartitionAwareStrategy;
  import org.apache.tinkerpop.gremlin.spark.structure.Spark;
  import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
  import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
@@@ -71,9 -69,11 +73,12 @@@ import org.apache.tinkerpop.gremlin.str
  
  import java.io.File;
  import java.io.IOException;
 +import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.Executor;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
  import java.util.concurrent.Future;
+ import java.util.concurrent.ThreadFactory;
  import java.util.stream.Stream;
  
  /**
@@@ -81,15 -81,17 +86,22 @@@
   */
  public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
  
 -
 +    private final org.apache.commons.configuration.Configuration sparkConfiguration;
 +    private boolean workersSet = false;
+     private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SparkGraphComputer.class.getSimpleName() + "-boss").build();
+ 
+     /**
+      * An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
+      * for a {@link VertexProgram} a single threaded executor is sufficient.
+      */
+     private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);
  
 -    private final org.apache.commons.configuration.Configuration sparkConfiguration;
 -    private boolean workersSet = false;
 +    static {
 +        TraversalStrategies.GlobalCache.registerStrategies(SparkGraphComputer.class,
 +                TraversalStrategies.GlobalCache.getStrategies(GraphComputer.class).clone().addStrategies(
 +                        SparkPartitionAwareStrategy.instance(),
 +                        SparkInterceptorStrategy.instance()));
 +    }
  
      public SparkGraphComputer(final HadoopGraph hadoopGraph) {
          super(hadoopGraph);
@@@ -120,8 -122,8 +132,8 @@@
      }
  
      private Future<ComputerResult> submitWithExecutor(Executor exec) {
--        // create the completable future                                                   �
-         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
++        // create the completable future
+         return computerService.submit(() -> {
              final long startTime = System.currentTimeMillis();
              // apache and hadoop configurations that are used throughout the graph computer computation
              final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
@@@ -240,43 -236,34 +252,47 @@@
                  // process the vertex program //
                  ////////////////////////////////
                  if (null != this.vertexProgram) {
 -                    // set up the vertex program and wire up configurations
 -                    JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
                      memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
 -                    this.vertexProgram.setup(memory);
 -                    memory.broadcastMemory(sparkContext);
 -                    final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
 -                    this.vertexProgram.storeState(vertexProgramConfiguration);
 -                    ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
 -                    ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
 -                    // execute the vertex program
 -                    while (true) {
 -                        if (Thread.interrupted()) {
 -                            sparkContext.cancelAllJobs();
 -                            throw new TraversalInterruptedException();
 +                    /////////////////
 +                    // if there is a registered VertexProgramInterceptor, use it to bypass the GraphComputer semantics
 +                    if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
 +                        try {
 +                            final SparkVertexProgramInterceptor<VertexProgram> interceptor =
 +                                    (SparkVertexProgramInterceptor) Class.forName(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
 +                            computedGraphRDD = interceptor.apply(this.vertexProgram, loadedGraphRDD, memory);
 +                        } catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
 +                            throw new IllegalStateException(e.getMessage());
                          }
 -                        memory.setInExecute(true);
 -                        viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
 -                        memory.setInExecute(false);
 -                        if (this.vertexProgram.terminate(memory))
 -                            break;
 -                        else {
 -                            memory.incrIteration();
 -                            memory.broadcastMemory(sparkContext);
 +                    } else {  // standard GraphComputer semantics
 +                        // set up the vertex program and wire up configurations
 +                        this.vertexProgram.setup(memory);
 +                        JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
 +                        memory.broadcastMemory(sparkContext);
 +                        final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
 +                        this.vertexProgram.storeState(vertexProgramConfiguration);
 +                        ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
 +                        ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
 +                        // execute the vertex program
 +                        while (true) {
++                            if (Thread.interrupted()) {
++                                sparkContext.cancelAllJobs();
++                                throw new TraversalInterruptedException();
++                            }
 +                            memory.setInExecute(true);
 +                            viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
 +                            memory.setInExecute(false);
 +                            if (this.vertexProgram.terminate(memory))
 +                                break;
 +                            else {
 +                                memory.incrIteration();
 +                                memory.broadcastMemory(sparkContext);
 +                            }
                          }
 +                        // write the computed graph to the respective output (rdd or output format)
 +                        computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
                      }
 +                    /////////////////
                      memory.complete(); // drop all transient memory keys
 -                    // write the computed graph to the respective output (rdd or output format)
 -                    computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
                      if (null != outputRDD && !this.persist.equals(Persist.NOTHING)) {
                          outputRDD.writeGraphRDD(apacheConfiguration, computedGraphRDD);
                      }
@@@ -405,4 -391,4 +421,4 @@@
          final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
          new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
      }
--}
++}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/01fda967/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
----------------------------------------------------------------------
diff --cc spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
index e737108,0000000..f85d6db
mode 100644,000000..100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
@@@ -1,97 -1,0 +1,103 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor;
 +
 +import org.apache.spark.api.java.JavaPairRDD;
 +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 +import org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects;
 +import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
++import org.apache.tinkerpop.gremlin.process.traversal.Scope;
 +import org.apache.tinkerpop.gremlin.process.traversal.Step;
 +import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
++import org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupGlobalStep;
 +import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
 +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
++import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
 +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
 +import org.apache.tinkerpop.gremlin.structure.Vertex;
 +import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 +
++import java.util.Arrays;
 +import java.util.Iterator;
 +import java.util.List;
 +
 +/**
 + * @author Marko A. Rodriguez (http://markorodriguez.com)
 + */
 +public final class SparkCountInterceptor implements SparkVertexProgramInterceptor<TraversalVertexProgram> {
 +
 +    public SparkCountInterceptor() {
 +
 +    }
 +
 +    @Override
 +    public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
 +        vertexProgram.setup(memory);
 +        final Traversal.Admin<Vertex, Long> traversal = (Traversal.Admin) vertexProgram.getTraversal().getPure().clone();
 +        final Object[] graphStepIds = ((GraphStep) traversal.getStartStep()).getIds();    // any V(1,2,3)-style ids to filter on
 +        final CountGlobalStep countGlobalStep = (CountGlobalStep) traversal.getEndStep(); // needed for the final traverser generation
 +        traversal.removeStep(0);                                    // remove GraphStep
 +        traversal.removeStep(traversal.getSteps().size() - 1);      // remove CountGlobalStep
 +        traversal.applyStrategies();                                // compile
 +        boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast)
 +
 +        ((MemoryTraversalSideEffects) traversal.getSideEffects()).setMemory(memory, true); // any intermediate sideEffect steps are backed by SparkMemory
 +        memory.setInExecute(true);
 +        final long count = inputRDD
 +                .filter(tuple -> ElementHelper.idExists(tuple._2().get().id(), graphStepIds))
 +                .flatMapValues(vertexWritable -> {
 +                    if (identityTraversal)                          // g.V.count()-style (identity)
 +                        return () -> (Iterator) IteratorUtils.of(vertexWritable);
 +                    else {                                          // add the vertex to head of the traversal
 +                        return () -> {                              // and iterate it for its results
-                             final Traversal.Admin<Vertex, ?> clone = traversal.clone();
++                            final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation
 +                            clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), EmptyStep.instance(), 1l));
 +                            return clone;
 +                        };
 +                    }
 +                }).count();
 +        memory.setInExecute(false);
 +
 +        // generate the HALTED_TRAVERSERS for the memory
 +        final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
 +        haltedTraversers.add(traversal.getTraverserGenerator().generate(count, countGlobalStep, 1l));
 +        memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
 +        memory.incrIteration(); // any local star graph reduction take a single iteration
 +        return inputRDD;
 +    }
 +
 +    public static boolean isLegal(final Traversal.Admin<?, ?> traversal) {
 +        final List<Step> steps = traversal.getSteps();
 +        if (!steps.get(0).getClass().equals(GraphStep.class) || ((GraphStep) steps.get(0)).returnsEdge())
 +            return false;
 +        if (!steps.get(steps.size() - 1).getClass().equals(CountGlobalStep.class))
 +            return false;
++        if (TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, Arrays.asList(CollectingBarrierStep.class, DedupGlobalStep.class), traversal))
++            return false;
 +        return TraversalHelper.isLocalStarGraph(traversal);
 +
 +    }
 +}
 +

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/01fda967/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --cc spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
index c89aed8,0000000..4529fd3
mode 100644,000000..100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
@@@ -1,141 -1,0 +1,145 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization;
 +
 +import org.apache.commons.configuration.Configuration;
 +import org.apache.tinkerpop.gremlin.TestHelper;
 +import org.apache.tinkerpop.gremlin.hadoop.Constants;
 +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 +import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.VertexProgramInterceptor;
 +import org.apache.tinkerpop.gremlin.process.traversal.P;
 +import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
++import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
 +import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkCountInterceptor;
 +import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 +import org.apache.tinkerpop.gremlin.structure.Graph;
 +import org.apache.tinkerpop.gremlin.structure.T;
 +import org.apache.tinkerpop.gremlin.structure.Vertex;
 +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 +import org.junit.Test;
 +
 +import java.util.Map;
 +import java.util.UUID;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
 +
 +/**
 + * @author Marko A. Rodriguez (http://markorodriguez.com)
 + */
 +public class SparkInterceptorStrategyTest extends AbstractSparkTest {
 +
 +    @Test
 +    public void shouldHandleSideEffectsCorrectly() throws Exception {
 +        final Configuration configuration = getBaseConfiguration();
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString()));
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
 +        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
 +        ///
 +        Graph graph = GraphFactory.open(configuration);
 +        GraphTraversalSource g = graph.traversal().withComputer();
 +        assertTrue(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
 +        assertTrue(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
 +        /// groupCount(m)-test
 +        Traversal.Admin<Vertex, Long> traversal = g.V().groupCount("m").by(T.label).count().asAdmin();
 +        test(SparkCountInterceptor.class, 6l, traversal);
 +        assertEquals(1, traversal.getSideEffects().keys().size());
 +        assertTrue(traversal.getSideEffects().exists("m"));
 +        assertTrue(traversal.getSideEffects().keys().contains("m"));
 +        final Map<String, Long> map = traversal.getSideEffects().get("m");
 +        assertEquals(2, map.size());
 +        assertEquals(2, map.get("software").intValue());
 +        assertEquals(4, map.get("person").intValue());
 +    }
 +
 +    @Test
 +    public void shouldSuccessfullyEvaluateInterceptedTraversals() throws Exception {
 +        final Configuration configuration = getBaseConfiguration();
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString()));
 +        configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
 +        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
 +        ///
 +        Graph graph = GraphFactory.open(configuration);
 +        GraphTraversalSource g = graph.traversal().withComputer();
 +        assertTrue(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
 +        assertTrue(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
 +        /// SparkCountInterceptor matches
 +        test(SparkCountInterceptor.class, 6l, g.V().count());
 +        test(SparkCountInterceptor.class, 2l, g.V().hasLabel("software").count());
 +        test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).count());
 +        test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).values("name").count());
 +        test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).properties("name").count());
 +        test(SparkCountInterceptor.class, 4l, g.V().hasLabel("person").has("age", P.gt(30)).properties("name", "age").count());
 +        test(SparkCountInterceptor.class, 3l, g.V().hasLabel("person").has("age", P.gt(30)).out().count());
 +        test(SparkCountInterceptor.class, 0l, g.V().hasLabel("person").has("age", P.gt(30)).out("knows").count());
 +        test(SparkCountInterceptor.class, 3l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age", P.gt(30)).out("created").count());
 +        test(SparkCountInterceptor.class, 3l, g.V(1).out().count());
 +        test(SparkCountInterceptor.class, 2l, g.V(1).out("knows").count());
 +        test(SparkCountInterceptor.class, 3l, g.V(1).out("knows", "created").count());
 +        test(SparkCountInterceptor.class, 5l, g.V(1, 4).out("knows", "created").count());
 +        test(SparkCountInterceptor.class, 1l, g.V(2).in("knows").count());
 +        test(SparkCountInterceptor.class, 0l, g.V(6).has("name", "peter").in().count());
 +        test(SparkCountInterceptor.class, 6l, g.V().as("a").values("name").as("b").count());
 +        test(SparkCountInterceptor.class, 6l, g.V().as("a").count());
 +        test(SparkCountInterceptor.class, 1l, g.V().has("name", "marko").as("a").values("name").as("b").count());
 +        test(SparkCountInterceptor.class, 4l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age").out("created").count());
 +        /// No interceptor matches
 +        test(2l, g.V().out().out().count());
 +        test(6l, g.E().count());
 +        test(2l, g.V().out().out().count());
 +        test(6l, g.V().out().values("name").count());
 +        test(2l, g.V().out("knows").values("name").count());
 +        test(3l, g.V().in().has("name", "marko").count());
++        test(6l, g.V().repeat(__.dedup()).times(2).count());
++        test(6l, g.V().dedup().count());
++        test(4l, g.V().hasLabel("person").order().by("age").count());
 +    }
 +
 +    private static <R> void test(Class<? extends VertexProgramInterceptor> expectedInterceptor, R expectedResult, final Traversal<?, R> traversal) throws Exception {
 +        final Traversal.Admin<?, ?> clone = traversal.asAdmin().clone();
 +        clone.applyStrategies();
 +        final String interceptor = (String) TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, clone).get()
 +                .getComputer()
 +                .getConfiguration()
 +                .getOrDefault(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, null);
 +        if (null == expectedInterceptor)
 +            assertNull(interceptor);
 +        else
 +            assertEquals(expectedInterceptor, Class.forName(interceptor));
 +        assertEquals(expectedResult, traversal.next());
 +    }
 +
 +    private static <R> void test(R expectedResult, final Traversal<?, R> traversal) throws Exception {
 +        test(null, expectedResult, traversal);
 +    }
 +}