You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/01 22:15:41 UTC
incubator-tinkerpop git commit: more code tweaks to Spark BSP
algorithm -- minor nothings.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 6e06979da -> 40f5f625f
more code tweaks to Spark BSP algorithm -- minor nothings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/40f5f625
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/40f5f625
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/40f5f625
Branch: refs/heads/master
Commit: 40f5f625f35e6713c27e52a97536fad686809290
Parents: 6e06979
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Apr 1 14:15:38 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Apr 1 14:15:38 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkExecutor.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/40f5f625/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 86b59d1..65a75c1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -77,22 +77,23 @@ public final class SparkExecutor {
final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
workerVertexProgram.workerIterationStart(memory); // start the worker
- return () -> IteratorUtils.map(partitionIterator, vertexWritableAndIncomingMessages -> {
- final Vertex vertex = vertexWritableAndIncomingMessages._2()._1().get();
- final boolean hasViewAndMessages = vertexWritableAndIncomingMessages._2()._2().isPresent();
- final List<M> incomingMessages = hasViewAndMessages ? vertexWritableAndIncomingMessages._2()._2().get()._2() : Collections.emptyList();
- final List<DetachedVertexProperty<Object>> view = hasViewAndMessages ? vertexWritableAndIncomingMessages._2()._2().get()._1() : Collections.emptyList();
+ return () -> IteratorUtils.map(partitionIterator, vertexViewAndMessages -> {
+ final Vertex vertex = vertexViewAndMessages._2()._1().get();
+ final boolean hasViewAndMessages = vertexViewAndMessages._2()._2().isPresent();
+ final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._1() : Collections.emptyList();
+ final List<M> incomingMessages = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._2() : Collections.emptyList();
+ previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property)); // attach the view to the vertex
///
- view.forEach(property -> DetachedVertexProperty.addTo(vertex, property)); // attach the view to the vertex
final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex
- final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
- final List<DetachedVertexProperty<Object>> newView = null == elementComputeKeysArray ? // not all vertex programs have compute keys
+ ///
+ final List<DetachedVertexProperty<Object>> nextView = null == elementComputeKeysArray ? // not all vertex programs have compute keys
Collections.emptyList() :
IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
+ final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
if (!partitionIterator.hasNext())
workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
- return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
+ return new Tuple2<>(vertex.id(), new Tuple2<>(nextView, outgoingMessages));
});
});