You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/01 22:15:41 UTC

incubator-tinkerpop git commit: more code tweaks to Spark BSP algorithm -- minor nothings.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 6e06979da -> 40f5f625f


more code tweaks to Spark BSP algorithm -- minor nothings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/40f5f625
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/40f5f625
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/40f5f625

Branch: refs/heads/master
Commit: 40f5f625f35e6713c27e52a97536fad686809290
Parents: 6e06979
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Apr 1 14:15:38 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Apr 1 14:15:38 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/SparkExecutor.java    | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/40f5f625/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 86b59d1..65a75c1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -77,22 +77,23 @@ public final class SparkExecutor {
                     final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
                     final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
                     workerVertexProgram.workerIterationStart(memory); // start the worker
-                    return () -> IteratorUtils.map(partitionIterator, vertexWritableAndIncomingMessages -> {
-                        final Vertex vertex = vertexWritableAndIncomingMessages._2()._1().get();
-                        final boolean hasViewAndMessages = vertexWritableAndIncomingMessages._2()._2().isPresent();
-                        final List<M> incomingMessages = hasViewAndMessages ? vertexWritableAndIncomingMessages._2()._2().get()._2() : Collections.emptyList();
-                        final List<DetachedVertexProperty<Object>> view = hasViewAndMessages ? vertexWritableAndIncomingMessages._2()._2().get()._1() : Collections.emptyList();
+                    return () -> IteratorUtils.map(partitionIterator, vertexViewAndMessages -> {
+                        final Vertex vertex = vertexViewAndMessages._2()._1().get();
+                        final boolean hasViewAndMessages = vertexViewAndMessages._2()._2().isPresent();
+                        final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._1() : Collections.emptyList();
+                        final List<M> incomingMessages = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._2() : Collections.emptyList();
+                        previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property));  // attach the view to the vertex
                         ///
-                        view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));  // attach the view to the vertex
                         final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex
-                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
-                        final List<DetachedVertexProperty<Object>> newView = null == elementComputeKeysArray ?  // not all vertex programs have compute keys
+                        ///
+                        final List<DetachedVertexProperty<Object>> nextView = null == elementComputeKeysArray ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
                                 IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
+                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
-                        return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
+                        return new Tuple2<>(vertex.id(), new Tuple2<>(nextView, outgoingMessages));
                     });
                 });