You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/03 17:11:43 UTC
incubator-tinkerpop git commit: more comments on the Spark BSP
algorithm.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 4748e2521 -> 88e9b9dac
more comments on the Spark BSP algorithm.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/88e9b9da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/88e9b9da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/88e9b9da
Branch: refs/heads/master
Commit: 88e9b9daca53f30b5a8c1b67742967c8e7495f37
Parents: 4748e25
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Apr 3 09:11:31 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Apr 3 09:11:41 2015 -0600
----------------------------------------------------------------------
.../gremlin/hadoop/process/computer/spark/SparkExecutor.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/88e9b9da/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 6d98423..c7a18dd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -77,19 +77,19 @@ public final class SparkExecutor {
graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
// for each partition of vertices
.mapPartitionsToPair(partitionIterator -> {
- final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
+ final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
workerVertexProgram.workerIterationStart(memory); // start the worker
return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
- final Vertex vertex = vertexViewIncoming._2()._1().get();
+ final Vertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property)); // attach the view to the vertex
///
final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
- workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex
+ workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex for this iteration
///
final List<DetachedVertexProperty<Object>> nextView = null == elementComputeKeysArray ? // not all vertex programs have compute keys
Collections.emptyList() :
@@ -107,7 +107,7 @@ public final class SparkExecutor {
.flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload
IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))))) // emit the outgoing message payloads one by one
- .reduceByKey((a, b) -> { // reduce the view and incoming messages into a single payload object representing the new view and incoming messages for a vertex
+ .reduceByKey((a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
if (a instanceof ViewIncomingPayload) {
((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
return a;