You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/01 01:08:40 UTC

incubator-tinkerpop git commit: minor tweak to the SparkGraphComputer algorithm to mapValues() prior to emitting messages.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 63959c4ac -> 02b6083b8


minor tweak to the SparkGraphComputer algorithm to mapValues() prior to emitting messages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/02b6083b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/02b6083b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/02b6083b

Branch: refs/heads/master
Commit: 02b6083b828d5935d02a2473b37341693087352c
Parents: 63959c4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 31 17:08:37 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 31 17:08:37 2015 -0600

----------------------------------------------------------------------
 .../hadoop/process/computer/spark/SparkExecutor.java        | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/02b6083b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index fbbc9d0..72aa4d2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -105,16 +105,17 @@ public final class SparkExecutor {
         // "message pass" by reducing on the vertex object id of the message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
         final JavaPairRDD<Object, List<M>> incomingMessages = viewAndOutgoingMessagesRDD
-                .flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2()._2().iterator(), x -> {
+                .mapValues(Tuple2::_2)
+                .flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2().iterator(), message -> {
                     final List<M> list = new ArrayList<>();
-                    list.add(x._2());
-                    return new Tuple2<>(x._1(), list);
+                    list.add(message._2());
+                    return new Tuple2<>(message._1(), list);
                 })).reduceByKey((a, b) -> {
                     if (null == messageCombiner) {
                         a.addAll(b);
                         return a;
                     } else {
-                        final M m = messageCombiner.combine(a.get(0),b.get(0));
+                        final M m = messageCombiner.combine(a.get(0), b.get(0));
                         a.clear();
                         b.clear();
                         a.add(m);