You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/03 15:49:21 UTC

incubator-tinkerpop git commit: some last minute cleanups, comments before PR. integration tests passed over night. Spark integration tests passed for these changes right now.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1120 79ebaf9f9 -> 8fd950216


some last minute cleanups, comments before PR. integration tests passed over night. Spark integration tests passed for these changes right now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8fd95021
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8fd95021
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8fd95021

Branch: refs/heads/TINKERPOP-1120
Commit: 8fd9502160b7940a806247a16406663ff4b27826
Parents: 79ebaf9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue May 3 07:49:14 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue May 3 07:49:14 2016 -0600

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   | 41 ++++++++++----------
 1 file changed, 20 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8fd95021/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 3ebcb01..c216510 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -36,7 +36,6 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -61,7 +60,7 @@ public final class SparkExecutor {
     // DATA LOADING //
     //////////////////
 
-    public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
+    public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(final JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
         return graphRDD.mapPartitionsToPair(partitionIterator -> {
             final GraphFilter gFilter = graphFilter.clone();
             return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
@@ -99,29 +98,27 @@ public final class SparkExecutor {
                         if (memory.isInitialIteration() && vertexComputeKeysArray.length > 0)
                             vertex.properties(vertexComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
                         // drop any computed properties that are cached in memory
-                        if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
+                        vertex.dropVertexProperties(vertexComputeKeysArray);
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         IteratorUtils.removeOnNext(previousView.iterator()).forEachRemaining(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
                         assert previousView.isEmpty();
-                        ///
+                        // do the vertex's vertex program iteration
                         messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
                         // assert incomingMessages.isEmpty();  // maybe the program didn't read all the messages
                         incomingMessages.clear();
-                        ///
+                        // detached the compute property view from the vertex
                         final List<DetachedVertexProperty<Object>> nextView = vertexComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
-                                IteratorUtils.list(IteratorUtils.map(
-                                        IteratorUtils.filter(
-                                                vertex.properties(vertexComputeKeysArray),
-                                                VertexProperty::isPresent),
-                                        property -> DetachedFactory.detach(property, true)));
-                        // drop any computed properties that are cached in memory
-                        if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
-                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+                                IteratorUtils.list(IteratorUtils.map(vertex.properties(vertexComputeKeysArray), vertexProperty -> DetachedFactory.detach(vertexProperty, true)));
+                        // drop compute property view as it has now been detached from the vertex
+                        vertex.dropVertexProperties(vertexComputeKeysArray);
+                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages being sent by this vertex
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
-                        return (nextView.isEmpty() && outgoingMessages.isEmpty()) ? null : new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+                        return (nextView.isEmpty() && outgoingMessages.isEmpty()) ?
+                                null : // if there is no view nor outgoing messages, emit nothing
+                                new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));  // else, emit the vertex id, its view, and its outgoing messages
                     });
                 }, true)  // true means that the partition is preserved
                 .filter(tuple -> null != tuple); // if there are no messages or views, then the tuple is null (memory optimization)
@@ -131,9 +128,11 @@ public final class SparkExecutor {
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
         final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
                 .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
-                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
-                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))  // emit the outgoing message payloads one by one
-                .reduceByKey(graphRDD.partitioner().get(), (a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
+                        // emit the view payload
+                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
+                        // emit the outgoing message payloads one by one
+                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
+                .reduceByKey(graphRDD.partitioner().get(), (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
                     if (a instanceof ViewIncomingPayload) {
                         ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
                         return a;
@@ -147,10 +146,10 @@ public final class SparkExecutor {
                         return c;
                     }
                 })
-                .mapValues(payload -> {
-                    if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex with incoming messages
+                .mapValues(payload -> { // handle various corner cases of when views don't exist, messages don't exist, or neither exists.
+                    if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex view with incoming messages
                         return (ViewIncomingPayload<M>) payload;
-                    else if (payload instanceof ViewPayload)    // this happens if there is a vertex with no incoming messages
+                    else if (payload instanceof ViewPayload)    // this happens if there is a vertex view with no incoming messages
                         return new ViewIncomingPayload<>((ViewPayload) payload);
                     else                                        // this happens when there is a single message to a vertex that has no view or outgoing messages
                         return new ViewIncomingPayload<>((MessagePayload<M>) payload);
@@ -170,12 +169,12 @@ public final class SparkExecutor {
             final Set<VertexComputeKey> vertexComputeKeys) {
         // the graphRDD and the viewRDD must have the same partitioner
         assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
-        // attach the final computed view to the cached graph
         final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
         return graphRDD.leftOuterJoin(viewIncomingRDD)
                 .mapValues(tuple -> {
                     final StarGraph.StarVertex vertex = tuple._1().get();
                     vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
+                    // attach the final computed view to the cached graph
                     final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
                     for (final DetachedVertexProperty<Object> property : view) {
                         if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))