You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 17:26:10 UTC

incubator-tinkerpop git commit: just generalized SparkCountInterceptor to work with ANY star graph computation that ends with count(). The concept is simple, just do flatMapToPair(traversal).count(). tada. This is so clean and pretty.... next is going to

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1288 2717b281e -> 6f9e5fa0c


just generalized SparkCountInterceptor to work with ANY star graph computation that ends with count(). The concept is simple, just do flatMapToPair(traversal).count(). tada. This is so clean and pretty.... next is going to be generalizing to work with any ReducingBarrierStep, not just count(). Also, need to add some test cases that use sideEffects and ensure that Memory is updated accordingly. However, besides that, this is such a dead simple concept that I actually think this is how we should redesign SparkGraphComputer. Break up the traverasl into starGraphTraversals and then concatenate the jobs via Spark join()s.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/6f9e5fa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/6f9e5fa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/6f9e5fa0

Branch: refs/heads/TINKERPOP-1288
Commit: 6f9e5fa0c7ffa01b7fbd3c3fed9539f6ad12bd4c
Parents: 2717b28
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 11:26:00 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 11:26:00 2016 -0600

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       | 19 ++----
 .../optimization/SparkInterceptorStrategy.java  |  5 +-
 .../SparkPartitionAwareStrategy.java            |  4 +-
 .../interceptor/SparkCountInterceptor.java      | 72 +++++++-------------
 .../SparkInterceptorStrategyTest.java           |  8 +--
 5 files changed, 39 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6f9e5fa0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index d640e51..f6ed25d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -123,23 +123,12 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     }
 
     /**
-     * A helper method to yield a {@link Traversal} from the {@link Graph} and provided {@link Configuration}.
+     * Get the {@link PureTraversal} associated with the current instance of the {@link TraversalVertexProgram}.
      *
-     * @param graph         the graph that the traversal will run against
-     * @param configuration The configuration containing the TRAVERSAL key.
-     * @return the traversal supplied by the configuration
+     * @return the pure traversal of the instantiated program
      */
-    public static Traversal.Admin<?, ?> getTraversal(final Graph graph, final Configuration configuration) {
-        return VertexProgram.<TraversalVertexProgram>createVertexProgram(graph, configuration).traversal.get();
-    }
-
-    /**
-     * Get the {@link Traversal} associated with the current instance of the traversal vertex program.
-     *
-     * @return the traversal of the instantiated program
-     */
-    public Traversal.Admin<?, ?> getTraversal() {
-        return this.traversal.get();
+    public PureTraversal<?, ?> getTraversal() {
+        return this.traversal;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6f9e5fa0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
index b3d250a..89b831d 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategy.java
@@ -46,7 +46,10 @@ public final class SparkInterceptorStrategy extends AbstractTraversalStrategy<Tr
         final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance());
         final List<TraversalVertexProgramStep> steps = TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal);
         for (final TraversalVertexProgramStep step : steps) {
-            if (SparkCountInterceptor.isLegal(step.generateProgram(graph).getTraversal()))
+            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(graph).getTraversal().get().clone();
+            if (!computerTraversal.isLocked())
+                computerTraversal.applyStrategies();
+            if (SparkCountInterceptor.isLegal(computerTraversal))
                 step.setComputer(step.getComputer().configure(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR, SparkCountInterceptor.class.getCanonicalName()));
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6f9e5fa0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkPartitionAwareStrategy.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkPartitionAwareStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkPartitionAwareStrategy.java
index de3e737..a7380c4 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkPartitionAwareStrategy.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkPartitionAwareStrategy.java
@@ -59,7 +59,9 @@ public final class SparkPartitionAwareStrategy extends AbstractTraversalStrategy
     public void apply(final Traversal.Admin<?, ?> traversal) {
         final List<TraversalVertexProgramStep> steps = TraversalHelper.getStepsOfClass(TraversalVertexProgramStep.class, traversal);
         for (final TraversalVertexProgramStep step : steps) {
-            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(traversal.getGraph().orElse(EmptyGraph.instance())).getTraversal();
+            final Traversal.Admin<?, ?> computerTraversal = step.generateProgram(traversal.getGraph().orElse(EmptyGraph.instance())).getTraversal().get().clone();
+            if (!computerTraversal.isLocked())
+                computerTraversal.applyStrategies();
             boolean messagePasses = MESSAGEPASS_CLASSES.stream()
                     .flatMap(clazz -> TraversalHelper.<Step<?, ?>>getStepsOfAssignableClassRecursively((Class) clazz, computerTraversal).stream())
                     .filter(s -> TraversalHelper.isGlobalChild(((Step) s).getTraversal().asAdmin()))

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6f9e5fa0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
index 7ca4b61..6f91c42 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkCountInterceptor.java
@@ -24,22 +24,18 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
 import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
-import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
 
+import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -54,60 +50,40 @@ public final class SparkCountInterceptor implements SparkVertexProgramIntercepto
     @Override
     public JavaPairRDD<Object, VertexWritable> apply(final TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable> inputRDD, final SparkMemory memory) {
         vertexProgram.setup(memory);
-        final Traversal.Admin<Vertex, Long> traversal = (Traversal.Admin) vertexProgram.getTraversal();
-        JavaPairRDD<Object, VertexWritable> nextRDD = inputRDD;
-        long count = -1;
-        for (final Step step : traversal.getSteps()) {
-            if (step instanceof GraphStep) {
-                final Object[] ids = ((GraphStep) step).getIds();
-                if (ids.length > 0)
-                    nextRDD = nextRDD.filter(tuple -> ElementHelper.idExists(tuple._2().get().id(), ids));
-            } else if (step instanceof HasStep) {
-                final List<HasContainer> hasContainers = ((HasStep<?>) step).getHasContainers();
-                if (hasContainers.size() > 0)
-                    nextRDD = nextRDD.filter(tuple -> HasContainer.testAll(tuple._2().get(), hasContainers));
-            } else if (step instanceof PropertiesStep) {
-                final String[] keys = ((PropertiesStep) step).getPropertyKeys();
-                count = nextRDD.mapValues(vertex -> IteratorUtils.count(vertex.get().properties(keys))).fold(new Tuple2<>("sum", 0l), (a, b) -> new Tuple2<>("sum", a._2() + b._2()))._2();
-            } else if (step instanceof VertexStep) {
-                final String[] labels = ((VertexStep) step).getEdgeLabels();
-                final Direction direction = ((VertexStep) step).getDirection();
-                count = nextRDD.mapValues(vertex -> IteratorUtils.count(vertex.get().edges(direction, labels))).fold(new Tuple2<>("sum", 0l), (a, b) -> new Tuple2<>("sum", a._2() + b._2()))._2();
-            } else if (step instanceof CountGlobalStep) {
-                if (count == -1)
-                    count = nextRDD.count();
-            }
-        }
-        assert count != -1;
+        final Traversal.Admin<Vertex, Long> traversal = (Traversal.Admin) vertexProgram.getTraversal().getPure().clone();
+        final Object[] graphStepIds = ((GraphStep) traversal.getStartStep()).getIds();
+        final CountGlobalStep countGlobalStep = (CountGlobalStep) traversal.getEndStep();
+        traversal.removeStep(0);                                    // remove GraphStep
+        traversal.removeStep(traversal.getSteps().size() - 1);      // remove CountGlobalStep
+        traversal.applyStrategies();                                // compile
+        boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast)
+        final long count = inputRDD
+                .filter(tuple -> ElementHelper.idExists(tuple._2().get().id(), graphStepIds))
+                .flatMapValues(vertexWritable -> {
+                    if (identityTraversal)                          // g.V.count()-style (identity)
+                        return () -> (Iterator) IteratorUtils.of(vertexWritable);
+                    else {                                          // add the vertex to head of the traversal
+                        final Traversal.Admin<Vertex, ?> clone = traversal.clone();
+                        clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), EmptyStep.instance(), 1l));
+                        return () -> clone;
+                    }
+                }).count();
+        // generate the HALTED_TRAVERSERS for the memory
         final TraverserSet<Long> haltedTraversers = new TraverserSet<>();
-        haltedTraversers.add(traversal.getTraverserGenerator().generate(count, (Step) traversal.getEndStep(), 1l));
+        haltedTraversers.add(traversal.getTraverserGenerator().generate(count, countGlobalStep, 1l));
         memory.set(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversers);
-        memory.incrIteration();
+        memory.incrIteration(); // any local star graph reduction take a single iteration
         return inputRDD;
     }
 
     public static boolean isLegal(final Traversal.Admin<?, ?> traversal) {
-        if (!TraversalHelper.getLabels(traversal).isEmpty())
-            return false;
         final List<Step> steps = traversal.getSteps();
         if (!steps.get(0).getClass().equals(GraphStep.class) || ((GraphStep) steps.get(0)).returnsEdge())
             return false;
         if (!steps.get(steps.size() - 1).getClass().equals(CountGlobalStep.class))
             return false;
-        int vertexPropertiesStepCount = 0;
-        for (int i = 1; i < steps.size() - 1; i++) {
-            final Step<?, ?> step = steps.get(i);
-            final Class<? extends Step> stepClass = step.getClass();
-            if (!stepClass.equals(HasStep.class) && !stepClass.equals(PropertiesStep.class) && !stepClass.equals(VertexStep.class))
-                return false;
-            if ((stepClass.equals(VertexStep.class) || stepClass.equals(PropertiesStep.class)) && (++vertexPropertiesStepCount > 1 || !step.getNextStep().getClass().equals(CountGlobalStep.class)))
-                return false;
-            if (stepClass.equals(HasStep.class) && !step.getPreviousStep().getClass().equals(GraphStep.class) && !step.getPreviousStep().getClass().equals(HasStep.class))
-                return false;
-        }
-        return true;
+        return TraversalHelper.isLocalStarGraph(traversal);
 
     }
-
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6f9e5fa0/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
index c1cf8cf..b2a4820 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
@@ -80,17 +80,17 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
         test(SparkCountInterceptor.class, 5l, g.V(1, 4).out("knows", "created").count());
         test(SparkCountInterceptor.class, 1l, g.V(2).in("knows").count());
         test(SparkCountInterceptor.class, 0l, g.V(6).has("name", "peter").in().count());
+        test(SparkCountInterceptor.class, 6l, g.V().as("a").values("name").as("b").count());
+        test(SparkCountInterceptor.class, 6l, g.V().as("a").count());
+        test(SparkCountInterceptor.class, 1l, g.V().has("name", "marko").as("a").values("name").as("b").count());
+        test(SparkCountInterceptor.class, 4l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age").out("created").count());
         /// No interceptor matches
         test(2l, g.V().out().out().count());
         test(6l, g.E().count());
-        test(6l, g.V().as("a").values("name").as("b").count());
-        test(6l, g.V().as("a").count());
-        test(1l, g.V().has("name", "marko").as("a").values("name").as("b").count());
         test(2l, g.V().out().out().count());
         test(6l, g.V().out().values("name").count());
         test(2l, g.V().out("knows").values("name").count());
         test(3l, g.V().in().has("name", "marko").count());
-        test(4l, g.V().has(T.label, P.not(P.within("robot", "android")).and(P.within("person", "software"))).hasLabel("person").has("age").out("created").count()); // TODO: filter(values()) should be okay.
     }
 
     private static <R> void test(Class<? extends VertexProgramInterceptor> expectedInterceptor, R expectedResult, final Traversal<?, R> traversal) throws Exception {