You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/03 17:11:43 UTC

incubator-tinkerpop git commit: more comments on the Spark BSP algorithm.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 4748e2521 -> 88e9b9dac


more comments on the Spark BSP algorithm.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/88e9b9da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/88e9b9da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/88e9b9da

Branch: refs/heads/master
Commit: 88e9b9daca53f30b5a8c1b67742967c8e7495f37
Parents: 4748e25
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Apr 3 09:11:31 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Apr 3 09:11:41 2015 -0600

----------------------------------------------------------------------
 .../gremlin/hadoop/process/computer/spark/SparkExecutor.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/88e9b9da/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 6d98423..c7a18dd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -77,19 +77,19 @@ public final class SparkExecutor {
                 graphRDD.leftOuterJoin(viewIncomingRDD))                                                                        // every other iteration may have views and messages
                 // for each partition of vertices
                 .mapPartitionsToPair(partitionIterator -> {
-                    final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
+                    final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
                     final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
                     final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
                     workerVertexProgram.workerIterationStart(memory); // start the worker
                     return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
-                        final Vertex vertex = vertexViewIncoming._2()._1().get();
+                        final Vertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
                         final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property));  // attach the view to the vertex
                         ///
                         final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
-                        workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex
+                        workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex for this iteration
                         ///
                         final List<DetachedVertexProperty<Object>> nextView = null == elementComputeKeysArray ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
@@ -107,7 +107,7 @@ public final class SparkExecutor {
                 .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
                         IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
                         IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))  // emit the outgoing message payloads one by one
-                .reduceByKey((a, b) -> {      // reduce the view and incoming messages into a single payload object representing the new view and incoming messages for a vertex
+                .reduceByKey((a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
                     if (a instanceof ViewIncomingPayload) {
                         ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
                         return a;