You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 18:55:29 UTC

[5/8] incubator-tinkerpop git commit: finally figured out how to do a reduceByKey() with empty tuples. This is the super optimization -- if there are no views and no outgoing messages, then the reduceByKey is trivially complex. For TraversalVertexProgram

finally figured out how to do a reduceByKey() with empty tuples. This is the super optimization -- if there are no views and no outgoing messages, then the reduceByKey is trivially complex. For TraversalVertexProgram, this means that the final step takes no time at all. Running integration tests overnight.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/79ebaf9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/79ebaf9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/79ebaf9f

Branch: refs/heads/master
Commit: 79ebaf9f94f0b645ba493551ff219a786003cc85
Parents: 6f13c0c
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon May 2 19:03:11 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon May 2 19:03:11 2016 -0600

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   | 51 ++++++++++++--------
 .../computer/payload/ViewIncomingPayload.java   |  5 ++
 2 files changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79ebaf9f/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 520701e..3ebcb01 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -88,7 +88,7 @@ public final class SparkExecutor {
                 .mapPartitionsToPair(partitionIterator -> {
                     HadoopPools.initialize(apacheConfiguration);
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
-                    final String[] elementComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
+                    final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
                     workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
                     return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
@@ -96,10 +96,10 @@ public final class SparkExecutor {
                         final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList();
                         // revive compute properties if they already exist
-                        if (memory.isInitialIteration() && elementComputeKeysArray.length > 0)
-                            vertex.properties(elementComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
+                        if (memory.isInitialIteration() && vertexComputeKeysArray.length > 0)
+                            vertex.properties(vertexComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
                         // drop any computed properties that are cached in memory
-                        if (elementComputeKeysArray.length > 0) vertex.dropVertexProperties(elementComputeKeysArray);
+                        if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         IteratorUtils.removeOnNext(previousView.iterator()).forEachRemaining(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
                         assert previousView.isEmpty();
@@ -109,21 +109,22 @@ public final class SparkExecutor {
                         // assert incomingMessages.isEmpty();  // maybe the program didn't read all the messages
                         incomingMessages.clear();
                         ///
-                        final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
+                        final List<DetachedVertexProperty<Object>> nextView = vertexComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
                                 IteratorUtils.list(IteratorUtils.map(
                                         IteratorUtils.filter(
-                                                vertex.properties(elementComputeKeysArray),
+                                                vertex.properties(vertexComputeKeysArray),
                                                 VertexProperty::isPresent),
                                         property -> DetachedFactory.detach(property, true)));
                         // drop any computed properties that are cached in memory
-                        if (elementComputeKeysArray.length > 0) vertex.dropVertexProperties(elementComputeKeysArray);
+                        if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
                         final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
-                        return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+                        return (nextView.isEmpty() && outgoingMessages.isEmpty()) ? null : new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
                     });
-                }, true); // true means that the partition is preserved
+                }, true)  // true means that the partition is preserved
+                .filter(tuple -> null != tuple); // if there are no messages or views, then the tuple is null (memory optimization)
         // the graphRDD and the viewRDD must have the same partitioner
         assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
         // "message pass" by reducing on the vertex object id of the view and message payloads
@@ -146,11 +147,14 @@ public final class SparkExecutor {
                         return c;
                     }
                 })
-                .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
-                .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
-                .mapValues(payload -> payload instanceof ViewIncomingPayload ?
-                        (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
-                        new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
+                .mapValues(payload -> {
+                    if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex with incoming messages
+                        return (ViewIncomingPayload<M>) payload;
+                    else if (payload instanceof ViewPayload)    // this happens if there is a vertex with no incoming messages
+                        return new ViewIncomingPayload<>((ViewPayload) payload);
+                    else                                        // this happens when there is a single message to a vertex that has no view or outgoing messages
+                        return new ViewIncomingPayload<>((MessagePayload<M>) payload);
+                });
         // the graphRDD and the viewRDD must have the same partitioner
         assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
         newViewIncomingRDD
@@ -160,16 +164,20 @@ public final class SparkExecutor {
         return newViewIncomingRDD;
     }
 
-    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final Set<VertexComputeKey> vertexComputeKeys) {
+    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
+            final JavaPairRDD<Object, VertexWritable> graphRDD,
+            final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
+            final Set<VertexComputeKey> vertexComputeKeys) {
         // the graphRDD and the viewRDD must have the same partitioner
         assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
         // attach the final computed view to the cached graph
+        final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
         return graphRDD.leftOuterJoin(viewIncomingRDD)
                 .mapValues(tuple -> {
                     final StarGraph.StarVertex vertex = tuple._1().get();
+                    vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
                     final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
                     for (final DetachedVertexProperty<Object> property : view) {
-                        vertex.dropVertexProperties(property.key());
                         if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
                             property.attach(Attachable.Method.create(vertex));
                     }
@@ -181,7 +189,9 @@ public final class SparkExecutor {
     // MAP REDUCE //
     ////////////////
 
-    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+    public static <K, V> JavaPairRDD<K, V> executeMap(
+            final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce,
+            final Configuration apacheConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
             return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
@@ -191,14 +201,17 @@ public final class SparkExecutor {
         return mapRDD;
     }
 
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD, final Configuration apacheConfiguration) {
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD,
+                                                                    final Configuration apacheConfiguration) {
         return mapRDD.mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
             return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
         });
     }
 
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(
+            final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce,
+            final Configuration apacheConfiguration) {
         JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
             return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79ebaf9f/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
index 4c4850b..904583e 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
@@ -49,6 +49,11 @@ public final class ViewIncomingPayload<M> implements Payload {
             this.view = null;
     }
 
+    public ViewIncomingPayload(final MessagePayload<M> messagePayload) {
+        this.incomingMessages = new ArrayList<>();
+        this.incomingMessages.add(messagePayload.getMessage());
+    }
+
 
     public List<DetachedVertexProperty<Object>> getView() {
         return null == this.view ? Collections.emptyList() : this.view;