You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/26 22:46:22 UTC
incubator-tinkerpop git commit: being smart about payload type. still
the inane problem on large recurssive RDDs.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 774a87e04 -> b4fdfd779
being smart about payload type. still the inane problem on large recurssive RDDs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b4fdfd77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b4fdfd77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b4fdfd77
Branch: refs/heads/master
Commit: b4fdfd779aced3366da5f5834513d71cdd639d94
Parents: 774a87e
Author: okram <ok...@apache.org>
Authored: Thu Mar 26 15:46:18 2015 -0600
Committer: okram <ok...@apache.org>
Committed: Thu Mar 26 15:46:18 2015 -0600
----------------------------------------------------------------------
.../hadoop/process/computer/spark/SparkExecutor.java | 12 +++++++++---
.../process/computer/spark/SparkGraphComputer.java | 2 +-
.../hadoop/process/computer/spark/SparkPayload.java | 11 +++--------
3 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4fdfd77/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 44ac16b..58f4ad1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -75,7 +75,7 @@ public final class SparkExecutor {
vertex._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
return () -> IteratorUtils.concat(
IteratorUtils.of(vertex),
- IteratorUtils.map(vertex._2().asVertexPayload().getOutgoingMessages().iterator(),
+ IteratorUtils.map(vertex._2().asVertexPayload().detachOutgoingMessages(),
message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2()))));
});
@@ -84,8 +84,14 @@ public final class SparkExecutor {
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
current = current.reduceByKey((payloadA, payloadB) -> {
if (payloadA.isVertex()) {
- payloadA.addMessages(payloadB.getMessages(), messageCombiner);
- return payloadA;
+ if (payloadB.isVertex()) { // TODO: total hack cause of something weird with recursive RDDs
+ final int sizeA = payloadA.getMessages().size();
+ final int sizeB = payloadB.getMessages().size();
+ return sizeA >= sizeB ? payloadA : payloadB;
+ } else {
+ payloadA.addMessages(payloadB.getMessages(), messageCombiner);
+ return payloadA;
+ }
} else {
payloadB.addMessages(payloadA.getMessages(), messageCombiner);
return payloadB;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4fdfd77/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 733f2a2..fc83bd6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -208,7 +208,7 @@ public final class SparkGraphComputer implements GraphComputer {
vertex.asVertexPayload().getOutgoingMessages().clear();
vertex.asVertexPayload().getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
return vertex;
- });
+ }); // todo: cache()?
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4fdfd77/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
index ebb4b18..3ec1d5b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import java.util.List;
+import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -29,15 +30,9 @@ public interface SparkPayload<M> {
public default void addMessages(final List<M> otherMessages, final MessageCombiner<M> messageCombiner) {
if (null != messageCombiner) {
- M message = null;
- for (final M m : this.getMessages()) {
- message = null == message ? m : messageCombiner.combine(message, m);
- }
- for (final M m : otherMessages) {
- message = null == message ? m : messageCombiner.combine(message, m);
- }
+ final M message = Stream.concat(this.getMessages().stream(),otherMessages.stream()).reduce(messageCombiner::combine).get();
this.getMessages().clear();
- if (null != message) this.getMessages().add(message);
+ this.getMessages().add(message);
} else {
this.getMessages().addAll(otherMessages);
}