You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/26 23:28:28 UTC
incubator-tinkerpop git commit: a reduceByKey as the initial
operation of the graphRDD ensures no duplicate vertices.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master b4fdfd779 -> ba79c0ef4
a reduceByKey as the initial operation of the graphRDD ensures no duplicate vertices.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ba79c0ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ba79c0ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ba79c0ef
Branch: refs/heads/master
Commit: ba79c0ef480b3b6b1eb1c518881e19b1ba1d656a
Parents: b4fdfd7
Author: okram <ok...@apache.org>
Authored: Thu Mar 26 16:28:23 2015 -0600
Committer: okram <ok...@apache.org>
Committed: Thu Mar 26 16:28:23 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkExecutor.java | 29 +++++++-------------
1 file changed, 10 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ba79c0ef/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 58f4ad1..f3bdcfd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -49,12 +49,8 @@ public final class SparkExecutor {
private SparkExecutor() {
}
- private static final void doNothing() {
- // a cheap action
- }
-
public static <M> JavaPairRDD<Object, SparkPayload<M>> executeStep(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
- JavaPairRDD<Object, SparkPayload<M>> current = graphRDD;
+ JavaPairRDD<Object, SparkPayload<M>> current = graphRDD.reduceByKey((payloadA, payloadB) -> payloadA); // TODO: total hack cause of something weird with recursive RDDs
// execute vertex program
current = current.mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
@@ -71,24 +67,19 @@ public final class SparkExecutor {
});
// emit messages by appending them to the graph as message payloads
- current = current.<Object, SparkPayload<M>>flatMapToPair(vertex -> {
- vertex._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
- return () -> IteratorUtils.concat(
- IteratorUtils.of(vertex),
- IteratorUtils.map(vertex._2().asVertexPayload().detachOutgoingMessages(),
- message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2()))));
-
- });
+ current = current.<Object, SparkPayload<M>>flatMapToPair(vertex -> () ->
+ IteratorUtils.concat(
+ IteratorUtils.of(vertex),
+ IteratorUtils.map(vertex._2().asVertexPayload().detachOutgoingMessages(),
+ message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2())))));
// "message pass" by merging the message payloads with the vertex payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
current = current.reduceByKey((payloadA, payloadB) -> {
if (payloadA.isVertex()) {
- if (payloadB.isVertex()) { // TODO: total hack cause of something weird with recursive RDDs
- final int sizeA = payloadA.getMessages().size();
- final int sizeB = payloadB.getMessages().size();
- return sizeA >= sizeB ? payloadA : payloadB;
- } else {
+ if (payloadB.isVertex())
+ throw new IllegalStateException("It should not be the case that two vertices reduce to the same key: " + payloadA.asVertexPayload().getVertex() + "==" + payloadB.asVertexPayload().getVertex());
+ else {
payloadA.addMessages(payloadB.getMessages(), messageCombiner);
return payloadA;
}
@@ -97,7 +88,7 @@ public final class SparkExecutor {
return payloadB;
}
});
- current.foreach(vertex -> doNothing()); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
+ current.foreach(vertex -> {}); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
return current;
}