You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 14:40:40 UTC
incubator-tinkerpop git commit: Generalized
SparkVertexCountInterceptor to SparkCountInterceptor and it can recognize and
natively process various g.V.name.count(), g.V.out.count,
etc. type of traversals. Added a few more test cases to AdjacentToIncident
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1288 d5e7c7862 -> f0acfd751
Generalized SparkVertexCountInterceptor to SparkCountInterceptor and it can recognize and natively process various g.V.name.count(), g.V.out.count, etc. type of traversals. Added a few more test cases to AdjacentToIncidentStrategy. Came up with a nice test infrastructure for SparkInterceptorStrategyTest.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f0acfd75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f0acfd75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f0acfd75
Branch: refs/heads/TINKERPOP-1288
Commit: f0acfd7512330f82d14e0c17d90f46487008b6e1
Parents: d5e7c78
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 08:40:34 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 08:40:34 2016 -0600
----------------------------------------------------------------------
.../AdjacentToIncidentStrategyTest.java | 4 +
.../optimization/SparkInterceptorStrategy.java | 7 +-
.../interceptor/SparkCountInterceptor.java | 113 +++++++++++++++++++
.../SparkVertexCountInterceptor.java | 53 ---------
.../SparkInterceptorStrategyTest.java | 69 +++++++----
5 files changed, 164 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java
index f2668fd..c791cd5 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/AdjacentToIncidentStrategyTest.java
@@ -60,6 +60,10 @@ public class AdjacentToIncidentStrategyTest {
public static Iterable<Object[]> generateTestParameters() {
return Arrays.asList(new Traversal[][]{
+ {__.outE().count(), __.outE().count()},
+ {__.bothE("knows").count(), __.bothE("knows").count()},
+ {__.properties().count(), __.properties().count()},
+ {__.properties("name").count(), __.properties("name").count()},
{__.out().count(), __.outE().count()},
{__.in().count(), __.inE().count()},
{__.both().count(), __.bothE().count()},
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
index a22cef7..b3d250a 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
@@ -21,12 +21,11 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.o
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.TraversalMatcher;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkVertexCountInterceptor;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkCountInterceptor;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
@@ -47,8 +46,8 @@ public final class SparkInterceptorStrategy extends AbstractTraversalStrategy<Tr
final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance());
final List<TraversalVertexProgramStep> steps = TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal);
for (final TraversalVertexProgramStep step : steps) {
- if (TraversalMatcher.is_g_V_count(step.generateProgram(graph).getTraversal()))
- step.setComputer(step.getComputer().configure(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, SparkVertexCountInterceptor.class.getCanonicalName()));
+ if (SparkCountInterceptor.isLegal(step.generateProgram(graph).getTraversal()))
+ step.setComputer(step.getComputer().configure(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, SparkCountInterceptor.class.getCanonicalName()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
new file mode 100644
index 0000000..7ca4b61
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkCountInterceptor implements SparkVertexProgramInterceptor<TraversalVertexProgram> {
+
+ public SparkCountInterceptor() {
+
+ }
+
+ @Override
+ public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
+ vertexProgram.setup(memory);
+ final Traversal.Admin<Vertex, Long> traversal = (Traversal.Admin) vertexProgram.getTraversal();
+ JavaPairRDD<Object, VertexWritable> nextRDD = inputRDD;
+ long count = -1;
+ for (final Step step : traversal.getSteps()) {
+ if (step instanceof GraphStep) {
+ final Object[] ids = ((GraphStep) step).getIds();
+ if (ids.length > 0)
+ nextRDD = nextRDD.filter(tuple -> ElementHelper.idExists(tuple._2().get().id(), ids));
+ } else if (step instanceof HasStep) {
+ final List<HasContainer> hasContainers = ((HasStep<?>) step).getHasContainers();
+ if (hasContainers.size() > 0)
+ nextRDD = nextRDD.filter(tuple -> HasContainer.testAll(tuple._2().get(), hasContainers));
+ } else if (step instanceof PropertiesStep) {
+ final String[] keys = ((PropertiesStep) step).getPropertyKeys();
+ count = nextRDD.mapValues(vertex -> IteratorUtils.count(vertex.get().properties(keys))).fold(new Tuple2<>("sum", 0l), (a, b) -> new Tuple2<>("sum", a._2() + b._2()))._2();
+ } else if (step instanceof VertexStep) {
+ final String[] labels = ((VertexStep) step).getEdgeLabels();
+ final Direction direction = ((VertexStep) step).getDirection();
+ count = nextRDD.mapValues(vertex -> IteratorUtils.count(vertex.get().edges(direction, labels))).fold(new Tuple2<>("sum", 0l), (a, b) -> new Tuple2<>("sum", a._2() + b._2()))._2();
+ } else if (step instanceof CountGlobalStep) {
+ if (count == -1)
+ count = nextRDD.count();
+ }
+ }
+ assert count != -1;
+ final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
+ haltedTraversers.add(traversal.getTraverserGenerator().generate(count, (Step) traversal.getEndStep(), 1l));
+ memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
+ memory.incrIteration();
+ return inputRDD;
+ }
+
+ public static boolean isLegal(final Traversal.Admin<?, ?> traversal) {
+ if (!TraversalHelper.getLabels(traversal).isEmpty())
+ return false;
+ final List<Step> steps = traversal.getSteps();
+ if (!steps.get(0).getClass().equals(GraphStep.class) || ((GraphStep) steps.get(0)).returnsEdge())
+ return false;
+ if (!steps.get(steps.size() - 1).getClass().equals(CountGlobalStep.class))
+ return false;
+ int vertexPropertiesStepCount = 0;
+ for (int i = 1; i < steps.size() - 1; i++) {
+ final Step<?, ?> step = steps.get(i);
+ final Class<? extends Step> stepClass = step.getClass();
+ if (!stepClass.equals(HasStep.class) && !stepClass.equals(PropertiesStep.class) && !stepClass.equals(VertexStep.class))
+ return false;
+ if ((stepClass.equals(VertexStep.class) || stepClass.equals(PropertiesStep.class)) && (++vertexPropertiesStepCount > 1 || !step.getNextStep().getClass().equals(CountGlobalStep.class)))
+ return false;
+ if (stepClass.equals(HasStep.class) && !step.getPreviousStep().getClass().equals(GraphStep.class) && !step.getPreviousStep().getClass().equals(HasStep.class))
+ return false;
+ }
+ return true;
+
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkVertexCountInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkVertexCountInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkVertexCountInterceptor.java
deleted file mode 100644
index 8feb628..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkVertexCountInterceptor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
-import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkVertexCountInterceptor implements SparkVertexProgramInterceptor<TraversalVertexProgram> {
-
- public SparkVertexCountInterceptor() {
-
- }
-
- @Override
- public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
- vertexProgram.setup(memory);
- final Traversal.Admin<Vertex, Long> traversal = (Traversal.Admin) vertexProgram.getTraversal();
- final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
- haltedTraversers.add(traversal.getTraverserGenerator().generate(inputRDD.count(), (Step) traversal.getEndStep(), 1l));
- memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
- memory.incrIteration();
- return inputRDD;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f0acfd75/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
index 1e73372..c1cf8cf 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
@@ -24,15 +24,18 @@ import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.VertexProgramInterceptor;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkVertexCountInterceptor;
+import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor.SparkCountInterceptor;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;
@@ -55,40 +58,56 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString()));
configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
///
Graph graph = GraphFactory.open(configuration);
GraphTraversalSource g = graph.traversal().withComputer();
assertTrue(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance()));
assertTrue(g.V().count().explain().toString().contains(SparkInterceptorStrategy.class.getSimpleName()));
- //
- assertEquals(6l, g.V().count().next().longValue());
- assertEquals(2l, g.V().out().out().count().next().longValue());
+ /// SparkCountInterceptor matches
+ test(SparkCountInterceptor.class, 6l, g.V().count());
+ test(SparkCountInterceptor.class, 2l, g.V().hasLabel("software").count());
+ test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).count());
+ test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).values("name").count());
+ test(SparkCountInterceptor.class, 2l, g.V().hasLabel("person").has("age", P.gt(30)).properties("name").count());
+ test(SparkCountInterceptor.class, 4l, g.V().hasLabel("person").has("age", P.gt(30)).properties("name", "age").count());
+ test(SparkCountInterceptor.class, 3l, g.V().hasLabel("person").has("age", P.gt(30)).out().count());
+ test(SparkCountInterceptor.class, 0l, g.V().hasLabel("person").has("age", P.gt(30)).out("knows").count());
+ test(SparkCountInterceptor.class, 3l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age", P.gt(30)).out("created").count());
+ test(SparkCountInterceptor.class, 3l, g.V(1).out().count());
+ test(SparkCountInterceptor.class, 2l, g.V(1).out("knows").count());
+ test(SparkCountInterceptor.class, 3l, g.V(1).out("knows", "created").count());
+ test(SparkCountInterceptor.class, 5l, g.V(1, 4).out("knows", "created").count());
+ test(SparkCountInterceptor.class, 1l, g.V(2).in("knows").count());
+ test(SparkCountInterceptor.class, 0l, g.V(6).has("name", "peter").in().count());
+ /// No interceptor matches
+ test(2l, g.V().out().out().count());
+ test(6l, g.E().count());
+ test(6l, g.V().as("a").values("name").as("b").count());
+ test(6l, g.V().as("a").count());
+ test(1l, g.V().has("name", "marko").as("a").values("name").as("b").count());
+ test(2l, g.V().out().out().count());
+ test(6l, g.V().out().values("name").count());
+ test(2l, g.V().out("knows").values("name").count());
+ test(3l, g.V().in().has("name", "marko").count());
+ test(4l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age").out("created").count()); // TODO: filter(values()) should be okay.
}
- @Test
- public void shouldInterceptExceptedTraversals() throws Exception {
- final Configuration configuration = getBaseConfiguration();
- configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, TestHelper.makeTestDataDirectory(SparkPartitionAwareStrategyTest.class, UUID.randomUUID().toString()));
- configuration.setProperty(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
- ///
- Graph graph = GraphFactory.open(configuration);
- GraphTraversalSource g = graph.traversal().withComputer();
- //
- assertEquals(SparkVertexCountInterceptor.class.getCanonicalName(), getInterceptor(g.V().count()));
- assertEquals(SparkVertexCountInterceptor.class.getCanonicalName(), getInterceptor(g.V().identity().identity().count()));
- assertNull(getInterceptor(g.V().out().count()));
- assertNull(getInterceptor(g.V().as("a").count()));
- }
-
- private static String getInterceptor(final Traversal<?, ?> traversal) {
- traversal.asAdmin().applyStrategies();
- return (String) TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal.asAdmin()).get()
+ private static <R> void test(Class<? extends VertexProgramInterceptor> expectedInterceptor, R expectedResult, final Traversal<?, R> traversal) throws Exception {
+ final Traversal.Admin<?, ?> clone = traversal.asAdmin().clone();
+ clone.applyStrategies();
+ final String interceptor = (String) TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, clone).get()
.getComputer()
.getConfiguration()
.getOrDefault(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, null);
+ if (null == expectedInterceptor)
+ assertNull(interceptor);
+ else
+ assertEquals(expectedInterceptor, Class.forName(interceptor));
+ assertEquals(expectedResult, traversal.next());
+ }
+ private static <R> void test(R expectedResult, final Traversal<?, R> traversal) throws Exception {
+ test(null, expectedResult, traversal);
}
}