You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/06 16:08:59 UTC

[05/18] incubator-tinkerpop git commit: DefaultTraversal.hashCode() was bad. Fixed. Added NativeInterceptor which right now is tied to SparkGraphComputer, but should be generalizable to all GraphComputer providers. In short, instead of executing the Vert

DefaultTraversal.hashCode() was bad. Fixed. Added NativeInterceptor which right now is tied to SparkGraphComputer, but should be generalizable to all GraphComputer providers. In short, instead of executing the VertexProgram, an interceptor can take the Graph/Memory and pop out a Graph/Memory. Thus, skipping the VertexProgram execution. Right now I have VertexCountInterceptor which does inputRDD.count(). BAM. Works. Added more test cases and cleanup. Going to probably just roost on this branch for a bit until I get the concept clear and simple. Pushing so I can benchmark on the Blades.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/7c103c8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/7c103c8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/7c103c8b

Branch: refs/heads/master
Commit: 7c103c8b0bf218c5eb6ec83ccfe5d416fd671e3d
Parents: 6e92c4e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue May 3 17:10:29 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue May 3 17:10:29 2016 -0600

----------------------------------------------------------------------
 .../traversal/util/DefaultTraversal.java        |  4 +-
 .../tinkerpop/gremlin/hadoop/Constants.java     |  1 +
 .../process/computer/NativeInterceptor.java     | 32 +++++++
 .../process/computer/SparkGraphComputer.java    | 55 ++++++-----
 .../optimization/SparkInterceptorStrategy.java  | 60 ++++++++++++
 .../SparkPartitionAwareStrategy.java            |  2 +-
 .../interceptors/VertexCountInterceptor.java    | 52 +++++++++++
 .../SparkInterceptorStrategyTest.java           | 96 ++++++++++++++++++++
 8 files changed, 278 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7c103c8b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
index e7d3257..78db94a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java
@@ -297,12 +297,12 @@ public class DefaultTraversal<S, E> implements Traversal.Admin<S, E> {
 
     @Override
     public boolean equals(final Object other) {
-        return other != null && other.getClass().equals(this.getClass()) && this.equals(((Traversal.Admin) other));
+        return other != null && other instanceof Traversal.Admin && this.equals(((Traversal.Admin) other));
     }
 
     @Override
     public int hashCode() {
-        int result = this.getClass().hashCode();
+        int result = 1;
         for (final Step step : this.asAdmin().getSteps()) {
             result ^= step.hashCode();
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7c103c8b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 872b3fb..ab45903 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -58,6 +58,7 @@ public final class Constants {
     public static final String GREMLIN_SPARK_GRAPH_STORAGE_LEVEL = "gremlin.spark.graphStorageLevel";
     public static final String GREMLIN_SPARK_PERSIST_STORAGE_LEVEL = "gremlin.spark.persistStorageLevel";
     public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner";
+    public static final String GREMLIN_SPARK_NATIVE_INTERCEPTOR = "gremlin.spark.nativeInterceptor";
     public static final String SPARK_SERIALIZER = "spark.serializer";
 
     public static String getGraphLocation(final String location) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7c103c8b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/NativeInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/NativeInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/NativeInterceptor.java
new file mode 100644
index 0000000..8448a1b
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/NativeInterceptor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface NativeInterceptor<V extends VertexProgram> {
+
+    public JavaPairRDD<Object, VertexWritable> apply(final V vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7c103c8b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index cd01779..49de96a 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -53,6 +53,7 @@ import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization.SparkInterceptorStrategy;
 import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization.SparkPartitionAwareStrategy;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
@@ -83,7 +84,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
     private boolean workersSet = false;
 
     static {
-        TraversalStrategies.GlobalCache.registerStrategies(SparkGraphComputer.class, TraversalStrategies.GlobalCache.getStrategies(GraphComputer.class).clone().addStrategies(SparkPartitionAwareStrategy.instance()));
+        TraversalStrategies.GlobalCache.registerStrategies(SparkGraphComputer.class,
+                TraversalStrategies.GlobalCache.getStrategies(GraphComputer.class).clone().addStrategies(
+                        SparkPartitionAwareStrategy.instance(),
+                        SparkInterceptorStrategy.instance()));
     }
 
     public SparkGraphComputer(final HadoopGraph hadoopGraph) {
@@ -228,7 +232,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
                 }
                 // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
-                if (!inputFromSpark || partitioned || filtered)
+                if (!skipPartitioner && (!inputFromSpark || partitioned || filtered))
                     loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
 
 
@@ -236,30 +240,39 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // process the vertex program //
                 ////////////////////////////////
                 if (null != this.vertexProgram) {
-                    // set up the vertex program and wire up configurations
-                    JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
                     memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                     this.vertexProgram.setup(memory);
-                    memory.broadcastMemory(sparkContext);
-                    final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
-                    this.vertexProgram.storeState(vertexProgramConfiguration);
-                    ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
-                    ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
-                    // execute the vertex program
-                    while (true) {
-                        memory.setInExecute(true);
-                        viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
-                        memory.setInExecute(false);
-                        if (this.vertexProgram.terminate(memory))
-                            break;
-                        else {
-                            memory.incrIteration();
-                            memory.broadcastMemory(sparkContext);
+                    if (apacheConfiguration.containsKey(Constants.GREMLIN_SPARK_NATIVE_INTERCEPTOR)) {
+                        try {
+                            final NativeInterceptor<VertexProgram> interceptor = (NativeInterceptor) Class.forName(apacheConfiguration.getString(Constants.GREMLIN_SPARK_NATIVE_INTERCEPTOR)).newInstance();
+                            computedGraphRDD = interceptor.apply(this.vertexProgram, loadedGraphRDD, memory);
+                        } catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+                            throw new IllegalStateException(e.getMessage());
                         }
+                    } else {
+                        // set up the vertex program and wire up configurations
+                        JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
+                        memory.broadcastMemory(sparkContext);
+                        final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
+                        this.vertexProgram.storeState(vertexProgramConfiguration);
+                        ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+                        ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
+                        // execute the vertex program
+                        while (true) {
+                            memory.setInExecute(true);
+                            viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+                            memory.setInExecute(false);
+                            if (this.vertexProgram.terminate(memory))
+                                break;
+                            else {
+                                memory.incrIteration();
+                                memory.broadcastMemory(sparkContext);
+                            }
+                        }
+                        // write the computed graph to the respective output (rdd or output format)
+                        computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
                     }
                     memory.complete(); // drop all transient memory keys
-                    // write the computed graph to the respective output (rdd or output format)
-                    computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
                     if (null != outputRDD && !this.persist.equals(Persist.NOTHING)) {
                         outputRDD.writeGraphRDD(apacheConfiguration, computedGraphRDD);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7c103c8b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkInterceptorStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkInterceptorStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkInterceptorStrategy.java
new file mode 100644
index 0000000..2ad1ad6
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkInterceptorStrategy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization;
+
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization.interceptors.VertexCountInterceptor;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkInterceptorStrategy extends AbstractTraversalStrategy<TraversalStrategy.ProviderOptimizationStrategy> implements TraversalStrategy.ProviderOptimizationStrategy {
+
+    private static final SparkInterceptorStrategy INSTANCE = new SparkInterceptorStrategy();
+
+    private SparkInterceptorStrategy() {
+    }
+
+    @Override
+    public void apply(final Traversal.Admin<?, ?> traversal) {
+        final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance());
+        final List<TraversalVertexProgramStep> steps = TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal);
+        for (final TraversalVertexProgramStep step : steps) {
+            if (step.generateProgram(graph).getTraversal().equals(graph.traversal().V().count())) {
+                step.setComputer(step.getComputer().configure(Constants.GREMLIN_SPARK_NATIVE_INTERCEPTOR, VertexCountInterceptor.class.getCanonicalName()));
+            }
+        }
+    }
+
+    public static SparkInterceptorStrategy instance() {
+        return INSTANCE;
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7c103c8b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategy.java
index 02aa1ae..5c28d41 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkPartitionAwareStrategy.java
@@ -41,7 +41,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkPartitionAwareStrategy extends AbstractTraversalStrategy<TraversalStrategy.ProviderOptimizationStrategy> implements TraversalStrategy.ProviderOptimizationStrategy {
+public final class SparkPartitionAwareStrategy extends AbstractTraversalStrategy<TraversalStrategy.ProviderOptimizationStrategy> implements TraversalStrategy.ProviderOptimizationStrategy {
 
     private static final SparkPartitionAwareStrategy INSTANCE = new SparkPartitionAwareStrategy();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7c103c8b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/interceptors/VertexCountInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/interceptors/VertexCountInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/interceptors/VertexCountInterceptor.java
new file mode 100644
index 0000000..b685677
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/interceptors/VertexCountInterceptor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization.interceptors;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.spark.process.computer.NativeInterceptor;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VertexCountInterceptor implements NativeInterceptor<TraversalVertexProgram> {
+
+    public VertexCountInterceptor() {
+
+    }
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
+        final Traversal.Admin<Vertex, Long> traversal = (Traversal.Admin) vertexProgram.getTraversal();
+        final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
+        haltedTraversers.add(traversal.getTraverserGenerator().generate(inputRDD.count(), (Step) traversal.getEndStep(), 1l));
+        memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
+        memory.incrIteration();
+        return inputRDD;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7c103c8b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkInterceptorStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkInterceptorStrategyTest.java
new file mode 100644
index 0000000..bb009eb
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/optimization/SparkInterceptorStrategyTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.optimization.interceptors.VertexCountInterceptor;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkInterceptorStrategyTest extends AbstractSparkTest {
+
+    @Test
+    public void shouldUseVertexCountInterceptor() throws Exception {
+        final String outputLocation = TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString());
+        Configuration configuration = getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+        ///
+        Graph graph = GraphFactory.open(configuration);
+        GraphTraversalSource g = graph.traversal().withComputer();
+        assertTrue(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
+        assertTrue(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
+        //
+        assertEquals(6l, g.V().count().next().longValue());
+        assertEquals(2l, g.V().out().out().count().next().longValue());
+    }
+
+    @Test
+    public void shouldSetConfigurationsCorrectly() throws Exception {
+        final String outputLocation = TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString());
+        Configuration configuration = getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+
+        Graph graph = GraphFactory.open(configuration);
+        GraphTraversalSource g = graph.traversal().withComputer();
+
+        assertEquals(VertexCountInterceptor.class.getCanonicalName(), getInterceptor(g.V().count()));
+        assertEquals(VertexCountInterceptor.class.getCanonicalName(), getInterceptor(g.V().identity().identity().count()));
+        assertNull(getInterceptor(g.V().out().count()));
+        assertNull(getInterceptor(g.V().as("a").count()));
+    }
+
+    private static String getInterceptor(final Traversal<?, ?> traversal) {
+        traversal.asAdmin().applyStrategies();
+        return (String) TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal.asAdmin()).get()
+                .getComputer()
+                .getConfiguration()
+                .getOrDefault(Constants.GREMLIN_SPARK_NATIVE_INTERCEPTOR, null);
+
+    }
+}