You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/02 20:58:58 UTC
incubator-tinkerpop git commit: reduce() in Spark happens locally and
then globally unlike Hadoop where local reduce is called combine. Fixed a bug
in message aggregation for the Spark message passing algorithm.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master b441c7ed9 -> e9a2e410c
reduce() in Spark happens locally and then globally unlike Hadoop where local reduce is called combine. Fixed a bug in message aggregation for the Spark message passing algorithm.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e9a2e410
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e9a2e410
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e9a2e410
Branch: refs/heads/master
Commit: e9a2e410c9ef865223bfb0a7f89224a62d11fd5a
Parents: b441c7e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 12:58:57 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 12:58:57 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkExecutor.java | 36 ++++--------------
.../spark/payload/ViewIncomingPayload.java | 39 +++++++++++++++++---
2 files changed, 41 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e9a2e410/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index f86c311..bb908aa 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -109,43 +109,21 @@ public final class SparkExecutor {
IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
.reduceByKey((a, b) -> {
if (a instanceof ViewIncomingPayload) {
- if (b instanceof MessagePayload)
- ((ViewIncomingPayload<M>) a).addIncomingMessage(((MessagePayload<M>) b).getMessage(), messageCombiner);
- else if (b instanceof ViewPayload)
- ((ViewIncomingPayload<M>) a).setView(((ViewPayload) b).getView());
- //else if (b instanceof ViewIncomingPayload)
- // throw new IllegalStateException("It should never be the case that two views reduce to the same key");
+ ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
return a;
} else if (b instanceof ViewIncomingPayload) {
- if (a instanceof MessagePayload)
- ((ViewIncomingPayload<M>) b).addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
- else if (a instanceof ViewPayload)
- ((ViewIncomingPayload<M>) b).setView(((ViewPayload) a).getView());
- //else if (a instanceof ViewIncomingPayload)
- // throw new IllegalStateException("It should never be the case that two views reduce to the same key");
+ ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
return b;
} else {
final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
- if (a instanceof MessagePayload)
- c.addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
- else if (a instanceof ViewPayload)
- c.setView(((ViewPayload) a).getView());
- if (b instanceof MessagePayload)
- c.addIncomingMessage(((MessagePayload<M>) b).getMessage(), messageCombiner);
- else if (b instanceof ViewPayload)
- c.setView(((ViewPayload) b).getView());
+ c.mergePayload(a, messageCombiner);
+ c.mergePayload(b, messageCombiner);
return c;
}
})
- .mapValues(payload -> {
- if (payload instanceof ViewIncomingPayload)
- return (ViewIncomingPayload<M>) payload;
- else { // this means the vertex has no incoming messages
- final ViewIncomingPayload<M> viewIncomingPayload = new ViewIncomingPayload<>();
- viewIncomingPayload.setView(((ViewPayload) payload).getView());
- return viewIncomingPayload;
- }
- });
+ .mapValues(payload -> payload instanceof ViewIncomingPayload ?
+ (ViewIncomingPayload<M>) payload :
+ new ViewIncomingPayload<>((ViewPayload) payload));
newViewIncomingRDD.foreachPartition(partitionIterator -> {
}); // need to complete a task so its BSP and the memory for this iteration is updated
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e9a2e410/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
index 2de9736..772052e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
@@ -36,30 +36,59 @@ public class ViewIncomingPayload<M> implements Payload {
private List<DetachedVertexProperty<Object>> view = null;
private final List<M> incomingMessages;
+
+ public ViewIncomingPayload() {
+ this.incomingMessages = null;
+ }
+
public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
}
- public ViewIncomingPayload() {
+ public ViewIncomingPayload(final ViewPayload viewPayload) {
this.incomingMessages = null;
+ this.view = viewPayload.getView();
}
+
public List<DetachedVertexProperty<Object>> getView() {
return null == this.view ? Collections.emptyList() : this.view;
}
- public void setView(final List<DetachedVertexProperty<Object>> view) {
- this.view = view;
- }
public List<M> getIncomingMessages() {
return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
}
- public void addIncomingMessage(final M message, final MessageCombiner<M> messageCombiner) {
+ ////////////////////
+
+
+ private void mergeMessage(final M message, final MessageCombiner<M> messageCombiner) {
if (this.incomingMessages.isEmpty() || null == messageCombiner)
this.incomingMessages.add(message);
else
this.incomingMessages.set(0, messageCombiner.combine(this.incomingMessages.get(0), message));
}
+
+ private void mergeViewIncomingPayload(final ViewIncomingPayload<M> viewIncomingPayload, final MessageCombiner<M> messageCombiner) {
+ if (this.view == null && !viewIncomingPayload.getView().isEmpty())
+ this.view = viewIncomingPayload.getView();
+ else
+ this.view.addAll(viewIncomingPayload.getView());
+
+ for (final M message : viewIncomingPayload.getIncomingMessages()) {
+ this.mergeMessage(message, messageCombiner);
+ }
+ }
+
+ public void mergePayload(final Payload payload, final MessageCombiner<M> messageCombiner) {
+ if (payload instanceof ViewPayload)
+ this.view = ((ViewPayload) payload).getView();
+ else if (payload instanceof MessagePayload)
+ this.mergeMessage(((MessagePayload<M>) payload).getMessage(), messageCombiner);
+ else if (payload instanceof ViewIncomingPayload)
+ this.mergeViewIncomingPayload((ViewIncomingPayload<M>) payload, messageCombiner);
+ else
+ throw new IllegalArgumentException("The provided payload is an unsupported merge payload: " + payload);
+ }
}