You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 14:40:40 UTC

incubator-tinkerpop git commit: Generalized SparkVertexCountInterceptor to SparkCountInterceptor and it can recognize and natively process various g.V.name.count(), g.V.out.count, etc. type of traversals. Added a few more test cases to AdjacentToIncident

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1288 d5e7c7862 -> f0acfd751


Generalized SparkVertexCountInterceptor to SparkCountInterceptor and it can recognize and natively process various g.V.name.count(), g.V.out.count, etc. type of traversals. Added a few more test cases to AdjacentToIncidentStrategy. Came up with a nice test infrastructure for SparkInterceptorStrategyTest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f0acfd75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f0acfd75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f0acfd75

Branch: refs/heads/TINKERPOP-1288
Commit: f0acfd7512330f82d14e0c17d90f46487008b6e1
Parents: d5e7c78
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 08:40:34 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 08:40:34 2016 -0600

----------------------------------------------------------------------
 .../AdjacentToIncidentStrategyTest.java         |   4 +
 .../optimization/SparkInterceptorStrategy.java  |   7 +-
 .../interceptor/SparkCountInterceptor.java      | 113 +++++++++++++++++++
 .../SparkVertexCountInterceptor.java            |  53 ---------
 .../SparkInterceptorStrategyTest.java           |  69 +++++++----
 5 files changed, 164 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java
index f2668fd..c791cd5 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java
@@ -60,6 +60,10 @@ public class AdjacentToIncidentStrategyTest {
     public static Iterable<Object[]> generateTestParameters() {
 
         return Arrays.asList(new Traversal[][]{
+                {__.outE().count(), __.outE().count()},
+                {__.bothE("knows").count(), __.bothE("knows").count()},
+                {__.properties().count(), __.properties().count()},
+                {__.properties("name").count(), __.properties("name").count()},
                 {__.out().count(), __.outE().count()},
                 {__.in().count(), __.inE().count()},
                 {__.both().count(), __.bothE().count()},

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
index a22cef7..b3d250a 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
@@ -21,12 +21,11 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.o
 
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.TraversalMatcher;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkVertexCountInterceptor;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkCountInterceptor;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
 
@@ -47,8 +46,8 @@ public final class SparkInterceptorStrategy extends AbstractTraversalStrategy<Tr
         final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance());
         final List<TraversalVertexProgramStep> steps = TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal);
         for (final TraversalVertexProgramStep step : steps) {
-            if (TraversalMatcher.is_g_V_count(step.generateProgram(graph).getTraversal()))
-                step.setComputer(step.getComputer().configure(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, SparkVertexCountInterceptor.class.getCanonicalName()));
+            if (SparkCountInterceptor.isLegal(step.generateProgram(graph).getTraversal()))
+                step.setComputer(step.getComputer().configure(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, SparkCountInterceptor.class.getCanonicalName()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
new file mode 100644
index 0000000..7ca4b61
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkCountInterceptor implements SparkVertexProgramInterceptor<TraversalVertexProgram> {
+
+    public SparkCountInterceptor() {
+
+    }
+
+    @Override
+    public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
+        vertexProgram.setup(memory);
+        final Traversal.Admin<Vertex, Long> traversal = (Traversal.Admin) vertexProgram.getTraversal();
+        JavaPairRDD<Object, VertexWritable> nextRDD = inputRDD;
+        long count = -1;
+        for (final Step step : traversal.getSteps()) {
+            if (step instanceof GraphStep) {
+                final Object[] ids = ((GraphStep) step).getIds();
+                if (ids.length > 0)
+                    nextRDD = nextRDD.filter(tuple -> ElementHelper.idExists(tuple._2().get().id(), ids));
+            } else if (step instanceof HasStep) {
+                final List<HasContainer> hasContainers = ((HasStep<?>) step).getHasContainers();
+                if (hasContainers.size() > 0)
+                    nextRDD = nextRDD.filter(tuple -> HasContainer.testAll(tuple._2().get(), hasContainers));
+            } else if (step instanceof PropertiesStep) {
+                final String[] keys = ((PropertiesStep) step).getPropertyKeys();
+                count = nextRDD.mapValues(vertex -> IteratorUtils.count(vertex.get().properties(keys))).fold(new Tuple2<>("sum", 0l), (a, b) -> new Tuple2<>("sum", a._2() + b._2()))._2();
+            } else if (step instanceof VertexStep) {
+                final String[] labels = ((VertexStep) step).getEdgeLabels();
+                final Direction direction = ((VertexStep) step).getDirection();
+                count = nextRDD.mapValues(vertex -> IteratorUtils.count(vertex.get().edges(direction, labels))).fold(new Tuple2<>("sum", 0l), (a, b) -> new Tuple2<>("sum", a._2() + b._2()))._2();
+            } else if (step instanceof CountGlobalStep) {
+                if (count == -1)
+                    count = nextRDD.count();
+            }
+        }
+        assert count != -1;
+        final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
+        haltedTraversers.add(traversal.getTraverserGenerator().generate(count, (Step) traversal.getEndStep(), 1l));
+        memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
+        memory.incrIteration();
+        return inputRDD;
+    }
+
+    public static boolean isLegal(final Traversal.Admin<?, ?> traversal) {
+        if (!TraversalHelper.getLabels(traversal).isEmpty())
+            return false;
+        final List<Step> steps = traversal.getSteps();
+        if (!steps.get(0).getClass().equals(GraphStep.class) || ((GraphStep) steps.get(0)).returnsEdge())
+            return false;
+        if (!steps.get(steps.size() - 1).getClass().equals(CountGlobalStep.class))
+            return false;
+        int vertexPropertiesStepCount = 0;
+        for (int i = 1; i < steps.size() - 1; i++) {
+            final Step<?, ?> step = steps.get(i);
+            final Class<? extends Step> stepClass = step.getClass();
+            if (!stepClass.equals(HasStep.class) && !stepClass.equals(PropertiesStep.class) && !stepClass.equals(VertexStep.class))
+                return false;
+            if ((stepClass.equals(VertexStep.class) || stepClass.equals(PropertiesStep.class)) && (++vertexPropertiesStepCount > 1 || !step.getNextStep().getClass().equals(CountGlobalStep.class)))
+                return false;
+            if (stepClass.equals(HasStep.class) && !step.getPreviousStep().getClass().equals(GraphStep.class) && !step.getPreviousStep().getClass().equals(HasStep.class))
+                return false;
+        }
+        return true;
+
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkVertexCountInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkVertexCountInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkVertexCountInterceptor.java
deleted file mode 100644
index 8feb628..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkVertexCountInterceptor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
-import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkVertexCountInterceptor implements SparkVertexProgramInterceptor<TraversalVertexProgram> {
-
-    public SparkVertexCountInterceptor() {
-
-    }
-
-    @Override
-    public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
-        vertexProgram.setup(memory);
-        final Traversal.Admin<Vertex, Long> traversal = (Traversal.Admin) vertexProgram.getTraversal();
-        final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
-        haltedTraversers.add(traversal.getTraverserGenerator().generate(inputRDD.count(), (Step) traversal.getEndStep(), 1l));
-        memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
-        memory.incrIteration();
-        return inputRDD;
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
index 1e73372..c1cf8cf 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
@@ -24,15 +24,18 @@ import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.VertexProgramInterceptor;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkVertexCountInterceptor;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkCountInterceptor;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.junit.Test;
 
@@ -55,40 +58,56 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString()));
         configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
         ///
         Graph graph = GraphFactory.open(configuration);
         GraphTraversalSource g = graph.traversal().withComputer();
         assertTrue(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
         assertTrue(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
-        //
-        assertEquals(6l, g.V().count().next().longValue());
-        assertEquals(2l, g.V().out().out().count().next().longValue());
+        /// SparkCountInterceptor matches
+        test(SparkCountInterceptor.class, 6l, g.V().count());
+        test(SparkCountInterceptor.class, 2l, g.V().hasLabel("software").count());
+        test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).count());
+        test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).values("name").count());
+        test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).properties("name").count());
+        test(SparkCountInterceptor.class, 4l, g.V().hasLabel("person").has("age", P.gt(30)).properties("name", "age").count());
+        test(SparkCountInterceptor.class, 3l, g.V().hasLabel("person").has("age", P.gt(30)).out().count());
+        test(SparkCountInterceptor.class, 0l, g.V().hasLabel("person").has("age", P.gt(30)).out("knows").count());
+        test(SparkCountInterceptor.class, 3l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age", P.gt(30)).out("created").count());
+        test(SparkCountInterceptor.class, 3l, g.V(1).out().count());
+        test(SparkCountInterceptor.class, 2l, g.V(1).out("knows").count());
+        test(SparkCountInterceptor.class, 3l, g.V(1).out("knows", "created").count());
+        test(SparkCountInterceptor.class, 5l, g.V(1, 4).out("knows", "created").count());
+        test(SparkCountInterceptor.class, 1l, g.V(2).in("knows").count());
+        test(SparkCountInterceptor.class, 0l, g.V(6).has("name", "peter").in().count());
+        /// No interceptor matches
+        test(2l, g.V().out().out().count());
+        test(6l, g.E().count());
+        test(6l, g.V().as("a").values("name").as("b").count());
+        test(6l, g.V().as("a").count());
+        test(1l, g.V().has("name", "marko").as("a").values("name").as("b").count());
+        test(2l, g.V().out().out().count());
+        test(6l, g.V().out().values("name").count());
+        test(2l, g.V().out("knows").values("name").count());
+        test(3l, g.V().in().has("name", "marko").count());
+        test(4l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age").out("created").count()); // TODO: filter(values()) should be okay.
     }
 
-    @Test
-    public void shouldInterceptExceptedTraversals() throws Exception {
-        final Configuration configuration = getBaseConfiguration();
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString()));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
-        ///
-        Graph graph = GraphFactory.open(configuration);
-        GraphTraversalSource g = graph.traversal().withComputer();
-        //
-        assertEquals(SparkVertexCountInterceptor.class.getCanonicalName(), getInterceptor(g.V().count()));
-        assertEquals(SparkVertexCountInterceptor.class.getCanonicalName(), getInterceptor(g.V().identity().identity().count()));
-        assertNull(getInterceptor(g.V().out().count()));
-        assertNull(getInterceptor(g.V().as("a").count()));
-    }
-
-    private static String getInterceptor(final Traversal<?, ?> traversal) {
-        traversal.asAdmin().applyStrategies();
-        return (String) TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal.asAdmin()).get()
+    private static <R> void test(Class<? extends VertexProgramInterceptor> expectedInterceptor, R expectedResult, final Traversal<?, R> traversal) throws Exception {
+        final Traversal.Admin<?, ?> clone = traversal.asAdmin().clone();
+        clone.applyStrategies();
+        final String interceptor = (String) TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, clone).get()
                 .getComputer()
                 .getConfiguration()
                 .getOrDefault(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, null);
+        if (null == expectedInterceptor)
+            assertNull(interceptor);
+        else
+            assertEquals(expectedInterceptor, Class.forName(interceptor));
+        assertEquals(expectedResult, traversal.next());
+    }
 
+    private static <R> void test(R expectedResult, final Traversal<?, R> traversal) throws Exception {
+        test(null, expectedResult, traversal);
     }
 }