You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/03 22:53:25 UTC

[7/9] incubator-tinkerpop git commit: syncing with 1120.

syncing with 1120.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e8bba359
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e8bba359
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e8bba359

Branch: refs/heads/TINKERPOP-1288
Commit: e8bba359be7a773a138b023929c0a2c02a9d6de8
Parents: 77480a2
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue May 3 14:48:27 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue May 3 14:48:27 2016 -0600

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   | 138 ++++++++++---------
 1 file changed, 70 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e8bba359/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index aadb70d..3e6a816 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -21,8 +21,6 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
 import com.google.common.base.Optional;
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -62,7 +60,7 @@ public final class SparkExecutor {
     // DATA LOADING //
     //////////////////
 
-    public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
+    public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(final JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
         return graphRDD.mapPartitionsToPair(partitionIterator -> {
             final GraphFilter gFilter = graphFilter.clone();
             return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
@@ -80,17 +78,16 @@ public final class SparkExecutor {
             final SparkMemory memory,
             final Configuration apacheConfiguration) {
 
-        final boolean partitionedGraphRDD = graphRDD.partitioner().isPresent();
-        if (partitionedGraphRDD && null != viewIncomingRDD) // the graphRDD and the viewRDD must have the same partitioner
+        if (null != viewIncomingRDD) // the graphRDD and the viewRDD must have the same partitioner
             assert graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get());
-        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
+        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
                 graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
                 graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages
                 // for each partition of vertices emit a view and their outgoing messages
                 .mapPartitionsToPair(partitionIterator -> {
                     HadoopPools.initialize(apacheConfiguration);
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
-                    final String[] elementComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
+                    final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
                     workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
                     return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
@@ -98,70 +95,67 @@ public final class SparkExecutor {
                         final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList();
                         // revive compute properties if they already exist
-                        if (memory.isInitialIteration() && elementComputeKeysArray.length > 0) {
-                            vertex.properties(elementComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
-                        }
+                        if (memory.isInitialIteration() && vertexComputeKeysArray.length > 0)
+                            vertex.properties(vertexComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
                         // drop any computed properties that are cached in memory
-                        if (elementComputeKeysArray.length > 0)
-                            vertex.dropVertexProperties(elementComputeKeysArray);
+                        vertex.dropVertexProperties(vertexComputeKeysArray);
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
-                        previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
-                        // previousView.clear(); // no longer needed so kill it from memory
-                        ///
+                        IteratorUtils.removeOnNext(previousView.iterator()).forEachRemaining(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
+                        assert previousView.isEmpty();
+                        // do the vertex's vertex program iteration
                         messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
-                        // incomingMessages.clear(); // no longer needed so kill it from memory
-                        ///
-                        final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
+                        // assert incomingMessages.isEmpty();  // maybe the program didn't read all the messages
+                        incomingMessages.clear();
+                        // detached the compute property view from the vertex
+                        final List<DetachedVertexProperty<Object>> nextView = vertexComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
-                                IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
-                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+                                IteratorUtils.list(IteratorUtils.map(vertex.properties(vertexComputeKeysArray), vertexProperty -> DetachedFactory.detach(vertexProperty, true)));
+                        // drop compute property view as it has now been detached from the vertex
+                        vertex.dropVertexProperties(vertexComputeKeysArray);
+                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages being sent by this vertex
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
-                        return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+                        return (nextView.isEmpty() && outgoingMessages.isEmpty()) ?
+                                null : // if there is no view nor outgoing messages, emit nothing
+                                new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));  // else, emit the vertex id, its view, and its outgoing messages
                     });
-                }, true)); // true means that the partition is preserved
+                }, true)  // true means that the partition is preserved
+                .filter(tuple -> null != tuple); // if there are no messages or views, then the tuple is null (memory optimization)
         // the graphRDD and the viewRDD must have the same partitioner
-        if (partitionedGraphRDD)
-            assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
+        assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
         // "message pass" by reducing on the vertex object id of the view and message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
-
-        /////////////////////////////////////////////////////////////
-        /////////////////////////////////////////////////////////////
-        final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>, Object, Payload> messageFunction =
-                tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
-                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
-                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))));
-        final Function2<Payload, Payload, Payload> reducerFunction = (a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
-            if (a instanceof ViewIncomingPayload) {
-                ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
-                return a;
-            } else if (b instanceof ViewIncomingPayload) {
-                ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
-                return b;
-            } else {
-                final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
-                c.mergePayload(a, messageCombiner);
-                c.mergePayload(b, messageCombiner);
-                return c;
-            }
-        };
-        /////////////////////////////////////////////////////////////
-        /////////////////////////////////////////////////////////////
-
-        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD =
-                (partitionedGraphRDD ?
-                        viewOutgoingRDD.flatMapToPair(messageFunction).reduceByKey(graphRDD.partitioner().get(), reducerFunction) :
-                        viewOutgoingRDD.flatMapToPair(messageFunction).reduceByKey(reducerFunction))
-                        .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
-                        .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
-                        .mapValues(payload -> payload instanceof ViewIncomingPayload ?
-                                (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
-                                new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
+        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
+                .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+                        // emit the view payload
+                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
+                        // emit the outgoing message payloads one by one
+                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
+                .reduceByKey(graphRDD.partitioner().get(), (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
+                    if (a instanceof ViewIncomingPayload) {
+                        ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
+                        return a;
+                    } else if (b instanceof ViewIncomingPayload) {
+                        ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
+                        return b;
+                    } else {
+                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
+                        c.mergePayload(a, messageCombiner);
+                        c.mergePayload(b, messageCombiner);
+                        return c;
+                    }
+                })
+                .mapValues(payload -> { // handle various corner cases of when views don't exist, messages don't exist, or neither exists.
+                    if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex view with incoming messages
+                        return (ViewIncomingPayload<M>) payload;
+                    else if (payload instanceof ViewPayload)    // this happens if there is a vertex view with no incoming messages
+                        return new ViewIncomingPayload<>((ViewPayload) payload);
+                    else                                        // this happens when there is a single message to a vertex that has no view or outgoing messages
+                        return new ViewIncomingPayload<>((MessagePayload<M>) payload);
+                });
         // the graphRDD and the viewRDD must have the same partitioner
-        if (partitionedGraphRDD)
-            assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
+        assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
         newViewIncomingRDD
                 .foreachPartition(partitionIterator -> {
                     HadoopPools.initialize(apacheConfiguration);
@@ -169,17 +163,20 @@ public final class SparkExecutor {
         return newViewIncomingRDD;
     }
 
-    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final Set<VertexComputeKey> vertexComputeKeys) {
+    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
+            final JavaPairRDD<Object, VertexWritable> graphRDD,
+            final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
+            final Set<VertexComputeKey> vertexComputeKeys) {
         // the graphRDD and the viewRDD must have the same partitioner
-        if (graphRDD.partitioner().isPresent())
-            assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
-        // attach the final computed view to the cached graph
+        assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
+        final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
         return graphRDD.leftOuterJoin(viewIncomingRDD)
                 .mapValues(tuple -> {
                     final StarGraph.StarVertex vertex = tuple._1().get();
+                    vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
+                    // attach the final computed view to the cached graph
                     final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
                     for (final DetachedVertexProperty<Object> property : view) {
-                        vertex.dropVertexProperties(property.key());
                         if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
                             property.attach(Attachable.Method.create(vertex));
                     }
@@ -191,7 +188,9 @@ public final class SparkExecutor {
     // MAP REDUCE //
     ////////////////
 
-    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+    public static <K, V> JavaPairRDD<K, V> executeMap(
+            final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce,
+            final Configuration apacheConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
             return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
@@ -201,14 +200,17 @@ public final class SparkExecutor {
         return mapRDD;
     }
 
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD, final Configuration apacheConfiguration) {
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD,
+                                                                    final Configuration apacheConfiguration) {
         return mapRDD.mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
             return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
         });
     }
 
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(
+            final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce,
+            final Configuration apacheConfiguration) {
         JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
             return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
@@ -217,4 +219,4 @@ public final class SparkExecutor {
             reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);
         return reduceRDD;
     }
-}
+}
\ No newline at end of file