You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/01 01:08:40 UTC
incubator-tinkerpop git commit: minor tweak to the SparkGraphComputer
algorithm to mapValues() prior to emitting messages.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 63959c4ac -> 02b6083b8
minor tweak to the SparkGraphComputer algorithm to mapValues() prior to emitting messages.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/02b6083b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/02b6083b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/02b6083b
Branch: refs/heads/master
Commit: 02b6083b828d5935d02a2473b37341693087352c
Parents: 63959c4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 31 17:08:37 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 31 17:08:37 2015 -0600
----------------------------------------------------------------------
.../hadoop/process/computer/spark/SparkExecutor.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/02b6083b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index fbbc9d0..72aa4d2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -105,16 +105,17 @@ public final class SparkExecutor {
// "message pass" by reducing on the vertex object id of the message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
final JavaPairRDD<Object, List<M>> incomingMessages = viewAndOutgoingMessagesRDD
- .flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2()._2().iterator(), x -> {
+ .mapValues(Tuple2::_2)
+ .flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2().iterator(), message -> {
final List<M> list = new ArrayList<>();
- list.add(x._2());
- return new Tuple2<>(x._1(), list);
+ list.add(message._2());
+ return new Tuple2<>(message._1(), list);
})).reduceByKey((a, b) -> {
if (null == messageCombiner) {
a.addAll(b);
return a;
} else {
- final M m = messageCombiner.combine(a.get(0),b.get(0));
+ final M m = messageCombiner.combine(a.get(0), b.get(0));
a.clear();
b.clear();
a.add(m);