You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/02 20:58:58 UTC

incubator-tinkerpop git commit: reduce() in Spark happens locally and then globally unlike Hadoop where local reduce is called combine. Fixed a bug in message aggregation for the Spark message passing algorithm.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master b441c7ed9 -> e9a2e410c


reduce() in Spark happens locally and then globally unlike Hadoop where local reduce is called combine. Fixed a bug in message aggregation for the Spark message passing algorithm.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e9a2e410
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e9a2e410
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e9a2e410

Branch: refs/heads/master
Commit: e9a2e410c9ef865223bfb0a7f89224a62d11fd5a
Parents: b441c7e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 12:58:57 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 12:58:57 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/SparkExecutor.java   | 36 ++++--------------
 .../spark/payload/ViewIncomingPayload.java      | 39 +++++++++++++++++---
 2 files changed, 41 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e9a2e410/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index f86c311..bb908aa 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -109,43 +109,21 @@ public final class SparkExecutor {
                         IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
                 .reduceByKey((a, b) -> {
                     if (a instanceof ViewIncomingPayload) {
-                        if (b instanceof MessagePayload)
-                            ((ViewIncomingPayload<M>) a).addIncomingMessage(((MessagePayload<M>) b).getMessage(), messageCombiner);
-                        else if (b instanceof ViewPayload)
-                            ((ViewIncomingPayload<M>) a).setView(((ViewPayload) b).getView());
-                        //else if (b instanceof ViewIncomingPayload)
-                        //    throw new IllegalStateException("It should never be the case that two views reduce to the same key");
+                        ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
                         return a;
                     } else if (b instanceof ViewIncomingPayload) {
-                        if (a instanceof MessagePayload)
-                            ((ViewIncomingPayload<M>) b).addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
-                        else if (a instanceof ViewPayload)
-                            ((ViewIncomingPayload<M>) b).setView(((ViewPayload) a).getView());
-                        //else if (a instanceof ViewIncomingPayload)
-                        //    throw new IllegalStateException("It should never be the case that two views reduce to the same key");
+                        ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
                         return b;
                     } else {
                         final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
-                        if (a instanceof MessagePayload)
-                            c.addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
-                        else if (a instanceof ViewPayload)
-                            c.setView(((ViewPayload) a).getView());
-                        if (b instanceof MessagePayload)
-                            c.addIncomingMessage(((MessagePayload<M>) b).getMessage(), messageCombiner);
-                        else if (b instanceof ViewPayload)
-                            c.setView(((ViewPayload) b).getView());
+                        c.mergePayload(a, messageCombiner);
+                        c.mergePayload(b, messageCombiner);
                         return c;
                     }
                 })
-                .mapValues(payload -> {
-                    if (payload instanceof ViewIncomingPayload)
-                        return (ViewIncomingPayload<M>) payload;
-                    else {  // this means the vertex has no incoming messages
-                        final ViewIncomingPayload<M> viewIncomingPayload = new ViewIncomingPayload<>();
-                        viewIncomingPayload.setView(((ViewPayload) payload).getView());
-                        return viewIncomingPayload;
-                    }
-                });
+                .mapValues(payload -> payload instanceof ViewIncomingPayload ?
+                        (ViewIncomingPayload<M>) payload :
+                        new ViewIncomingPayload<>((ViewPayload) payload));
 
         newViewIncomingRDD.foreachPartition(partitionIterator -> {
         }); // need to complete a task so its BSP and the memory for this iteration is updated

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e9a2e410/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
index 2de9736..772052e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
@@ -36,30 +36,59 @@ public class ViewIncomingPayload<M> implements Payload {
     private List<DetachedVertexProperty<Object>> view = null;
     private final List<M> incomingMessages;
 
+
+    public ViewIncomingPayload() {
+        this.incomingMessages = null;
+    }
+
     public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
         this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
     }
 
-    public ViewIncomingPayload()  {
+    public ViewIncomingPayload(final ViewPayload viewPayload) {
         this.incomingMessages = null;
+        this.view = viewPayload.getView();
     }
 
+
     public List<DetachedVertexProperty<Object>> getView() {
         return null == this.view ? Collections.emptyList() : this.view;
     }
 
-    public void setView(final List<DetachedVertexProperty<Object>> view) {
-        this.view = view;
-    }
 
     public List<M> getIncomingMessages() {
         return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
     }
 
-    public void addIncomingMessage(final M message, final MessageCombiner<M> messageCombiner) {
+    ////////////////////
+
+
+    private void mergeMessage(final M message, final MessageCombiner<M> messageCombiner) {
         if (this.incomingMessages.isEmpty() || null == messageCombiner)
             this.incomingMessages.add(message);
         else
             this.incomingMessages.set(0, messageCombiner.combine(this.incomingMessages.get(0), message));
     }
+
+    private void mergeViewIncomingPayload(final ViewIncomingPayload<M> viewIncomingPayload, final MessageCombiner<M> messageCombiner) {
+        if (this.view == null && !viewIncomingPayload.getView().isEmpty())
+            this.view = viewIncomingPayload.getView();
+        else
+            this.view.addAll(viewIncomingPayload.getView());
+
+        for (final M message : viewIncomingPayload.getIncomingMessages()) {
+            this.mergeMessage(message, messageCombiner);
+        }
+    }
+
+    public void mergePayload(final Payload payload, final MessageCombiner<M> messageCombiner) {
+        if (payload instanceof ViewPayload)
+            this.view = ((ViewPayload) payload).getView();
+        else if (payload instanceof MessagePayload)
+            this.mergeMessage(((MessagePayload<M>) payload).getMessage(), messageCombiner);
+        else if (payload instanceof ViewIncomingPayload)
+            this.mergeViewIncomingPayload((ViewIncomingPayload<M>) payload, messageCombiner);
+        else
+            throw new IllegalArgumentException("The provided payload is an unsupported merge payload: " + payload);
+    }
 }