You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 22:56:11 UTC
incubator-tinkerpop git commit: got GroupCount working with
SparkStarBarrierInterceptor. Going to test on the Blades.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1288 6a256f8fc -> 6e122e42b
got GroupCount working with SparkStarBarrierInterceptor. Going to test on the Blades.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/6e122e42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/6e122e42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/6e122e42
Branch: refs/heads/TINKERPOP-1288
Commit: 6e122e42b9c49648ce42a295b94d780c3eb11aae
Parents: 6a256f8
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 16:56:04 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 16:56:04 2016 -0600
----------------------------------------------------------------------
.../interceptor/SparkStarBarrierInterceptor.java | 18 ++++++++++++++----
.../SparkInterceptorStrategyTest.java | 9 +++++++++
2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6e122e42/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
index 3649c8f..76272be 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupCountStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.MinGlobalStep;
@@ -42,13 +43,14 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequire
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import java.util.Iterator;
+import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -75,7 +77,7 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
((MemoryTraversalSideEffects) traversal.getSideEffects()).setMemory(memory, true); // any intermediate sideEffect steps are backed by SparkMemory
memory.setInExecute(true);
final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values()
- .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds))
+ .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x)
.flatMap(vertexWritable -> {
if (identityTraversal) // g.V.count()-style (identity)
return () -> IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), EmptyStep.instance(), 1l));
@@ -107,6 +109,13 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
result = nextRDD
.map(traverser -> (Number) traverser.get())
.fold(Integer.MIN_VALUE, NumberHelper::max);
+ else if (endStep instanceof GroupCountStep)
+ result = nextRDD
+ .mapPartitions(partition -> {
+ final Traversal.Admin<?, Map<?, Long>> clone = (Traversal.Admin) endStepTraversal.clone();
+ return () -> IteratorUtils.map(partition, traverser -> TraversalUtil.apply((Traverser.Admin) traverser, clone));
+ })
+ .fold(((GroupCountStep<?, ?>) endStep).getSeedSupplier().get(), (a, b) -> GroupCountStep.GroupCountBiOperator.instance().apply((Map) a, (Map) b));
else
throw new IllegalArgumentException("The end step is an unsupported barrier: " + endStep);
memory.setInExecute(false);
@@ -129,8 +138,9 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
!endStep.getClass().equals(SumGlobalStep.class) &&
!endStep.getClass().equals(MeanGlobalStep.class) &&
!endStep.getClass().equals(MaxGlobalStep.class) &&
- !endStep.getClass().equals(MinGlobalStep.class))
- // TODO: group(), groupCount(), fold(), and tree()
+ !endStep.getClass().equals(MinGlobalStep.class) &&
+ !endStep.getClass().equals(GroupCountStep.class))
+ // TODO: group(), fold(), and tree()
return false;
if (TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, Barrier.class, traversal).size() != 1)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6e122e42/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
index f18a944..b48a2bb 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
@@ -41,6 +41,8 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -117,6 +119,11 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
test(SparkStarBarrierInterceptor.class, 67l, g.V().has("age").has("age", P.gt(30)).values("age").sum());
test(SparkStarBarrierInterceptor.class, 27, g.V().hasLabel("person").values("age").min());
test(SparkStarBarrierInterceptor.class, 35, g.V().hasLabel("person").values("age").max());
+ test(SparkStarBarrierInterceptor.class, new HashMap<String, Long>() {{
+ put("software", 2l);
+ put("person", 4l);
+ }}, g.V().<String>groupCount().by(T.label));
+ test(SparkStarBarrierInterceptor.class, Collections.singletonMap("person", 2l), g.V().has("person", "age", P.lt(30)).<String>groupCount().by(T.label));
/// No interceptor matches
test(2l, g.V().out().out().count());
test(6l, g.E().count());
@@ -128,6 +135,8 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
test(6l, g.V().dedup().count());
test(4l, g.V().hasLabel("person").order().by("age").count());
test(1l, g.V().count().count());
+ test(2l, g.V().limit(2).count());
+ test(3l, g.V().tail(3).count());
}
private static <R> void test(Class<? extends VertexProgramInterceptor> expectedInterceptor, R expectedResult, final Traversal<?, R> traversal) throws Exception {