You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 22:56:11 UTC

incubator-tinkerpop git commit: got GroupCount working with SparkStarBarrierInterceptor. Going to test on the Blades.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1288 6a256f8fc -> 6e122e42b


got GroupCount working with SparkStarBarrierInterceptor. Going to test on the Blades.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/6e122e42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/6e122e42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/6e122e42

Branch: refs/heads/TINKERPOP-1288
Commit: 6e122e42b9c49648ce42a295b94d780c3eb11aae
Parents: 6a256f8
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 16:56:04 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 16:56:04 2016 -0600

----------------------------------------------------------------------
 .../interceptor/SparkStarBarrierInterceptor.java  | 18 ++++++++++++++----
 .../SparkInterceptorStrategyTest.java             |  9 +++++++++
 2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6e122e42/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
index 3649c8f..76272be 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupCountStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.MinGlobalStep;
@@ -42,13 +43,14 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequire
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
 import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
-import java.util.Iterator;
+import java.util.Map;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -75,7 +77,7 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
         ((MemoryTraversalSideEffects) traversal.getSideEffects()).setMemory(memory, true); // any intermediate sideEffect steps are backed by SparkMemory
         memory.setInExecute(true);
         final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values()
-                .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds))
+                .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x)
                 .flatMap(vertexWritable -> {
                     if (identityTraversal)                          // g.V.count()-style (identity)
                         return () -> IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), EmptyStep.instance(), 1l));
@@ -107,6 +109,13 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
             result = nextRDD
                     .map(traverser -> (Number) traverser.get())
                     .fold(Integer.MIN_VALUE, NumberHelper::max);
+        else if (endStep instanceof GroupCountStep)
+            result = nextRDD
+                    .mapPartitions(partition -> {
+                        final Traversal.Admin<?, Map<?, Long>> clone = (Traversal.Admin) endStepTraversal.clone();
+                        return () -> IteratorUtils.map(partition, traverser -> TraversalUtil.apply((Traverser.Admin) traverser, clone));
+                    })
+                    .fold(((GroupCountStep<?, ?>) endStep).getSeedSupplier().get(), (a, b) -> GroupCountStep.GroupCountBiOperator.instance().apply((Map) a, (Map) b));
         else
             throw new IllegalArgumentException("The end step is an unsupported barrier: " + endStep);
         memory.setInExecute(false);
@@ -129,8 +138,9 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
                 !endStep.getClass().equals(SumGlobalStep.class) &&
                 !endStep.getClass().equals(MeanGlobalStep.class) &&
                 !endStep.getClass().equals(MaxGlobalStep.class) &&
-                !endStep.getClass().equals(MinGlobalStep.class))
-            // TODO: group(), groupCount(), fold(), and tree()
+                !endStep.getClass().equals(MinGlobalStep.class) &&
+                !endStep.getClass().equals(GroupCountStep.class))
+            // TODO: group(), fold(), and tree()
             return false;
         if (TraversalHelper.getStepsOfAssignableClassRecursively(Scope.global, Barrier.class, traversal).size() != 1)
             return false;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6e122e42/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
index f18a944..b48a2bb 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
@@ -41,6 +41,8 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
@@ -117,6 +119,11 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
         test(SparkStarBarrierInterceptor.class, 67l, g.V().has("age").has("age", P.gt(30)).values("age").sum());
         test(SparkStarBarrierInterceptor.class, 27, g.V().hasLabel("person").values("age").min());
         test(SparkStarBarrierInterceptor.class, 35, g.V().hasLabel("person").values("age").max());
+        test(SparkStarBarrierInterceptor.class, new HashMap<String, Long>() {{
+            put("software", 2l);
+            put("person", 4l);
+        }}, g.V().<String>groupCount().by(T.label));
+        test(SparkStarBarrierInterceptor.class, Collections.singletonMap("person", 2l), g.V().has("person", "age", P.lt(30)).<String>groupCount().by(T.label));
         /// No interceptor matches
         test(2l, g.V().out().out().count());
         test(6l, g.E().count());
@@ -128,6 +135,8 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
         test(6l, g.V().dedup().count());
         test(4l, g.V().hasLabel("person").order().by("age").count());
         test(1l, g.V().count().count());
+        test(2l, g.V().limit(2).count());
+        test(3l, g.V().tail(3).count());
     }
 
     private static <R> void test(Class<? extends VertexProgramInterceptor> expectedInterceptor, R expectedResult, final Traversal<?, R> traversal) throws Exception {