You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 21:35:59 UTC
incubator-tinkerpop git commit: lots of optimization goodies here.
learning more and more about Spark as I go and getting clever about it. My
powers are increasing. Soon I will crush you all with my army of machines.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 439fbb918 -> 4fb660488
lots of optimization goodies here. learning more and more about Spark as I go and getting clever about it. My powers are increasing. Soon I will crush you all with my army of machines.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4fb66048
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4fb66048
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4fb66048
Branch: refs/heads/master
Commit: 4fb660488f7f008fa8d7baf1548e0354f9342a93
Parents: 439fbb9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 4 13:35:56 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 4 13:35:56 2015 -0700
----------------------------------------------------------------------
.../process/computer/spark/SparkMessenger.java | 32 +++++++++++---
.../computer/spark/util/SparkHelper.java | 44 +++++++++-----------
2 files changed, 46 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4fb66048/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index 812bdd3..4fea02a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
import org.apache.tinkerpop.gremlin.process.Traversal;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.graph.traversal.step.map.VertexStep;
@@ -27,13 +28,17 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -42,7 +47,7 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
private Vertex vertex;
private List<M> incoming;
- private Map<Object, List<M>> outgoing = new HashMap<>();
+ private Map<Object, List<M>> outgoing;
public SparkMessenger() {
@@ -51,6 +56,7 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
public SparkMessenger(final Vertex vertex) {
this.vertex = vertex;
this.incoming = new ArrayList<>();
+ this.outgoing = this.vertex instanceof DetachedVertex ? null : new HashMap<>();
}
public SparkMessenger(final Vertex vertex, final List<M> incomingMessages) {
@@ -70,12 +76,20 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
return this.vertex;
}
- public void setVertex(final Vertex vertex) {
- this.vertex = vertex;
+ public void mergeInMessenger(final SparkMessenger<M> otherMessenger) {
+ this.vertex = otherMessenger.vertex;
+ this.outgoing = otherMessenger.outgoing;
}
- public void addIncomingMessages(final SparkMessenger<M> otherMessenger) {
- this.incoming.addAll(otherMessenger.incoming);
+ public void addIncomingMessages(final SparkMessenger<M> otherMessenger, final Optional<MessageCombiner<M>> messageCombinerOptional) {
+ if (messageCombinerOptional.isPresent()) {
+ final MessageCombiner<M> messageCombiner = messageCombinerOptional.get();
+ final M combinedMessage = Stream.concat(this.incoming.stream(), otherMessenger.incoming.stream()).reduce(messageCombiner::combine).get();
+ this.incoming.clear();
+ this.incoming.add(combinedMessage);
+ } else {
+ this.incoming.addAll(otherMessenger.incoming);
+ }
}
public Set<Map.Entry<Object, List<M>>> getOutgoingMessages() {
@@ -84,11 +98,17 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
@Override
public Iterable<M> receiveMessages(final MessageScope messageScope) {
- return this.incoming;
+ if (null == this.outgoing)
+ throw new IllegalStateException("Message vertices can not receive messages");
+
+ return null == this.incoming ? Collections.emptyList() : this.incoming;
}
@Override
public void sendMessage(final MessageScope messageScope, final M message) {
+ if (null == this.outgoing)
+ throw new IllegalStateException("Message vertices can not send messages");
+
if (messageScope instanceof MessageScope.Local) {
final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4fb66048/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index ff23d36..a974f63 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
@@ -47,6 +48,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -69,13 +71,6 @@ public final class SparkHelper {
});
});
- // clear all previous incoming messages
- if (!memory.isInitialIteration()) { // a quick savings as the initial iteration has no messages yet
- current = current.mapValues(messenger -> {
- messenger.clearIncomingMessages();
- return messenger;
- });
- }
// emit messages by appending them to the graph vertices data as "message vertices"
current = current.<Object, SparkMessenger<M>>flatMapToPair(keyValue -> () -> new Iterator<Tuple2<Object, SparkMessenger<M>>>() {
boolean first = true;
@@ -90,6 +85,7 @@ public final class SparkHelper {
public Tuple2<Object, SparkMessenger<M>> next() {
if (this.first) {
this.first = false;
+ keyValue._2().clearIncomingMessages(); // the raw vertex should not have any incoming messages (should be cleared from the previous stage)
return new Tuple2<Object, SparkMessenger<M>>(keyValue._1(), keyValue._2()); // this is the raw vertex data
} else {
final Map.Entry<Object, List<M>> entry = this.iterator.next();
@@ -98,19 +94,19 @@ public final class SparkHelper {
}
});
- // TODO: local message combiner
- if (globalVertexProgram.getMessageCombiner().isPresent()) {
- /* current = current.combineByKey(messenger -> {
- return messenger;
- });*/
- }
-
// "message pass" via reduction joining the "message vertices" with the graph vertices
- current = current.reduceByKey((a, b) -> {
- if (a.getVertex() instanceof DetachedVertex && !(b.getVertex() instanceof DetachedVertex))
- a.setVertex(b.getVertex());
- a.addIncomingMessages(b);
- return a; // always reduce on the first argument
+ // addIncomingMessages is provided the vertex program message combiner for partition and global level combining
+ final boolean hasMessageCombiner = globalVertexProgram.getMessageCombiner().isPresent();
+ current = current.reduceByKey((messengerA, messengerB) -> {
+ final Optional<MessageCombiner<M>> messageCombinerOptional = hasMessageCombiner ?
+ VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner() : // this is expensive but there is no "reduceByKeyPartition" :(
+ Optional.empty();
+
+ if (messengerA.getVertex() instanceof DetachedVertex && !(messengerB.getVertex() instanceof DetachedVertex)) // fold the message vertices into the graph vertices
+ messengerA.mergeInMessenger(messengerB);
+ messengerA.addIncomingMessages(messengerB, messageCombinerOptional);
+
+ return messengerA; // always reduce on the first argument
});
// clear all previous outgoing messages
@@ -122,10 +118,10 @@ public final class SparkHelper {
}
public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD, final MapReduce<K, V, ?, ?, ?> globalMapReduce, final Configuration apacheConfiguration) {
- JavaPairRDD<K, V> mapRDD = hadoopGraphRDD.mapPartitionsToPair(iterator -> {
+ JavaPairRDD<K, V> mapRDD = hadoopGraphRDD.mapPartitionsToPair(partitionIterator -> {
final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
- iterator.forEachRemaining(tuple -> workerMapReduce.map(tuple._2().get(), mapEmitter));
+ partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().get(), mapEmitter));
return mapEmitter.getEmissions();
});
if (globalMapReduce.getMapKeySort().isPresent())
@@ -136,10 +132,10 @@ public final class SparkHelper {
// TODO: public static executeCombine()
public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> globalMapReduce, final Configuration apacheConfiguration) {
- JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(iterator -> {
+ JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
- iterator.forEachRemaining(tuple -> workerMapReduce.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter));
+ partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
return reduceEmitter.getEmissions();
});
if (globalMapReduce.getReduceKeySort().isPresent())
@@ -174,7 +170,7 @@ public final class SparkHelper {
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a Hadoop stream for output
- mapReduceRDD.mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+ mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
ObjectWritable.class,
ObjectWritable.class,
(Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));