You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/03/27 12:13:33 UTC
[09/13] incubator-tinkerpop git commit: got back to where I was
around 10am this morning. Lazy iterators even though the Spark JavaAPI says
Iterable.
got back to where I was around 10am this morning. Lazy iterators even though the Spark JavaAPI says Iterable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/774a87e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/774a87e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/774a87e0
Branch: refs/heads/variables
Commit: 774a87e04d969518df554d757126afc8e4899e62
Parents: 53ebfea
Author: okram <ok...@apache.org>
Authored: Thu Mar 26 15:29:02 2015 -0600
Committer: okram <ok...@apache.org>
Committed: Thu Mar 26 15:29:14 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkExecutor.java | 87 +++++++++-----------
.../computer/spark/SparkGraphComputer.java | 16 ++--
2 files changed, 44 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/774a87e0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index d9a2537..44ac16b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -26,12 +26,6 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMessagePayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkReduceEmitter;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkVertexPayload;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
@@ -41,11 +35,10 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Set;
/**
@@ -56,6 +49,10 @@ public final class SparkExecutor {
private SparkExecutor() {
}
+ private static final void doNothing() {
+ // a cheap action
+ }
+
public static <M> JavaPairRDD<Object, SparkPayload<M>> executeStep(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
JavaPairRDD<Object, SparkPayload<M>> current = graphRDD;
// execute vertex program
@@ -63,52 +60,38 @@ public final class SparkExecutor {
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
workerVertexProgram.workerIterationStart(memory);
- final List<Tuple2<Object, SparkPayload<M>>> emission = new ArrayList<>();
- partitionIterator.forEachRemaining(keyValue -> {
- keyValue._2().asVertexPayload().getOutgoingMessages().clear();
- workerVertexProgram.execute(ComputerGraph.of(keyValue._2().asVertexPayload().getVertex(), elementComputeKeys), keyValue._2().asVertexPayload(), memory);
- emission.add(keyValue);
+ return () -> IteratorUtils.map(partitionIterator, vertex -> {
+ vertex._2().asVertexPayload().getOutgoingMessages().clear(); // there should be no outgoing messages at this point
+ workerVertexProgram.execute(ComputerGraph.of(vertex._2().asVertexPayload().getVertex(), elementComputeKeys), vertex._2().asVertexPayload(), memory);
+ if (!partitionIterator.hasNext())
+ workerVertexProgram.workerIterationEnd(memory);
+ vertex._2().getMessages().clear(); // there should be no incoming messages at this point (only outgoing messages)
+ return vertex;
});
- workerVertexProgram.workerIterationEnd(memory);
- return emission;
});
// emit messages by appending them to the graph as message payloads
- current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> {
- keyValue._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
- final List<Tuple2<Object, SparkPayload<M>>> list = new ArrayList<>();
- list.add(keyValue); // this is a vertex
- keyValue._2().asVertexPayload().getOutgoingMessages().forEach(message -> list.add(new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2())))); // this is a message
- return list;
+ current = current.<Object, SparkPayload<M>>flatMapToPair(vertex -> {
+ vertex._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
+ return () -> IteratorUtils.concat(
+ IteratorUtils.of(vertex),
+ IteratorUtils.map(vertex._2().asVertexPayload().getOutgoingMessages().iterator(),
+ message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2()))));
+
});
// "message pass" by merging the message payloads with the vertex payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
current = current.reduceByKey((payloadA, payloadB) -> {
if (payloadA.isVertex()) {
- final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadA.asVertexPayload().getVertex());
- vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
- vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
- return vertexPayload;
- } else if (payloadB.isVertex()) {
- final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadB.asVertexPayload().getVertex());
- vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
- vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
- return vertexPayload;
+ payloadA.addMessages(payloadB.getMessages(), messageCombiner);
+ return payloadA;
} else {
- final SparkMessagePayload<M> messagePayload = new SparkMessagePayload<>();
- messagePayload.addMessages(payloadA.getMessages(), messageCombiner);
- messagePayload.addMessages(payloadB.getMessages(), messageCombiner);
- return messagePayload;
+ payloadB.addMessages(payloadA.getMessages(), messageCombiner);
+ return payloadB;
}
});
-
- // clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
- current = current.mapValues(vertexPayload -> {
- vertexPayload.asVertexPayload().getOutgoingMessages().clear();
- return vertexPayload;
- });
-
+ current.foreach(vertex -> doNothing()); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
return current;
}
@@ -116,10 +99,13 @@ public final class SparkExecutor {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
workerMapReduce.workerStart(MapReduce.Stage.MAP);
- final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
- partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().getVertex(), mapEmitter));
- workerMapReduce.workerEnd(MapReduce.Stage.MAP);
- return mapEmitter.getEmissions();
+ return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
+ final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+ workerMapReduce.map(keyValue._2().getVertex(), mapEmitter);
+ if (!partitionIterator.hasNext())
+ workerMapReduce.workerEnd(MapReduce.Stage.MAP);
+ return mapEmitter.getEmissions().iterator();
+ });
});
if (mapReduce.getMapKeySort().isPresent())
mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
@@ -132,10 +118,13 @@ public final class SparkExecutor {
JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
- final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
- partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
- workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
- return reduceEmitter.getEmissions();
+ return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
+ final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+ workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
+ if (!partitionIterator.hasNext())
+ workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
+ return reduceEmitter.getEmissions().iterator();
+ });
});
if (mapReduce.getReduceKeySort().isPresent())
reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/774a87e0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index f43727f..733f2a2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -183,7 +183,6 @@ public final class SparkGraphComputer implements GraphComputer {
while (true) {
memory.setInTask(true);
graphRDD = SparkExecutor.executeStep(graphRDD, memory, vertexProgramConfiguration);
- graphRDD.foreachPartition(iterator -> doNothing()); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
memory.setInTask(false);
if (this.vertexProgram.terminate(memory))
break;
@@ -203,12 +202,13 @@ public final class SparkGraphComputer implements GraphComputer {
// process the map reducers //
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
- // drop all edges in the graphRDD as edges are not needed in the map reduce jobs
- graphRDD = graphRDD.mapToPair(tuple -> {
- tuple._2().asVertexPayload().getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
- return tuple;
+ // drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
+ graphRDD = graphRDD.mapValues(vertex -> {
+ vertex.getMessages().clear();
+ vertex.asVertexPayload().getOutgoingMessages().clear();
+ vertex.asVertexPayload().getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
+ return vertex;
});
- graphRDD = graphRDD.cache();
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
@@ -234,10 +234,6 @@ public final class SparkGraphComputer implements GraphComputer {
/////////////////
- private static final void doNothing() {
- // a cheap action
- }
-
private static void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);