You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/03 03:23:04 UTC
incubator-tinkerpop git commit: Some really cool lazy optimizations
to SparkGraphComputer. I 'get it' now. Easy peasy lemon squeezy.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/spark 1020fd281 -> 84be267aa
Some really cool lazy optimizations to SparkGraphComputer. I 'get it' now. Easy peasy lemon squeezy.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/84be267a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/84be267a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/84be267a
Branch: refs/heads/spark
Commit: 84be267aaf4a3ec2c3700af4c10586212cf894c8
Parents: 1020fd2
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Mar 2 19:23:12 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Mar 2 19:23:12 2015 -0700
----------------------------------------------------------------------
.../computer/spark/GraphComputerRDD.java | 39 +++++++++-----------
.../computer/spark/SparkGraphComputer.java | 14 ++-----
2 files changed, 22 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/84be267a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
index c99b108..abf0ac6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
@@ -25,6 +25,7 @@ import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.apache.spark.rdd.RDD;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;
import scala.reflect.ManifestFactory;
@@ -45,20 +46,30 @@ public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>>
super(rdd.rdd(), ManifestFactory.classType(Object.class), ManifestFactory.classType(SparkMessenger.class));
}
- public GraphComputerRDD completeIteration() {
+ public GraphComputerRDD execute(final Configuration configuration, final SparkMemory memory) {
JavaPairRDD<Object, SparkMessenger<M>> current = this;
- // clear all previous incoming messages
- current = current.mapValues(messenger -> {
- messenger.clearIncomingMessages();
- return messenger;
+ // execute vertex program
+ current = current.mapPartitionsToPair(iterator -> {
+ final VertexProgram<M> vertexProgram = VertexProgram.createVertexProgram(configuration);
+ return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(iterator, tuple -> {
+ vertexProgram.execute(tuple._2().vertex, tuple._2(), memory);
+ return tuple;
+ });
});
+ // clear all previous incoming messages
+ if(!memory.isInitialIteration()) {
+ current = current.mapValues(messenger -> {
+ messenger.clearIncomingMessages();
+ return messenger;
+ });
+ }
// emit messages
current = current.<Object, SparkMessenger<M>>flatMapToPair(tuple -> {
final List<Tuple2<Object, SparkMessenger<M>>> list = tuple._2().outgoing.entrySet()
.stream()
.map(entry -> new Tuple2<>(entry.getKey(), new SparkMessenger<>(new ToyVertex(entry.getKey()), entry.getValue())))
- .collect(Collectors.toList());
- list.add(new Tuple2<>(tuple._1(), tuple._2()));
+ .collect(Collectors.toList()); // the message vertices
+ list.add(new Tuple2<>(tuple._1(), tuple._2())); // the raw vertex
return list;
});
// "message pass" via reduction
@@ -73,20 +84,6 @@ public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>>
messenger.clearOutgoingMessages();
return messenger;
});
- current.count(); // TODO: necessary for BSP?
- return GraphComputerRDD.of(current);
- }
-
- private static void doNothing() {
-
- }
-
- public GraphComputerRDD execute(final Configuration configuration, final SparkMemory memory) {
- JavaPairRDD<Object, SparkMessenger<M>> current = this;
- current = current.mapValues(messenger -> {
- VertexProgram.createVertexProgram(configuration).execute(messenger.vertex, messenger, memory);
- return messenger;
- });
return GraphComputerRDD.of(current);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/84be267a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 7478998..107f1bc 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -18,12 +18,10 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -41,7 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -81,8 +78,6 @@ public class SparkGraphComputer implements GraphComputer {
JavaPairRDD<Object, SparkMessenger<Double>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
GraphComputerRDD<Double> g = GraphComputerRDD.of(rdd2);
- FileUtils.deleteDirectory(new File("/tmp/test"));
- g.saveAsObjectFile("/tmp/test");
final org.apache.commons.configuration.Configuration vertexProgram = new SerializableConfiguration();
final PageRankVertexProgram pageRankVertexProgram = PageRankVertexProgram.build().create();
@@ -90,18 +85,17 @@ public class SparkGraphComputer implements GraphComputer {
final SparkMemory memory = new SparkMemory(Collections.emptySet());
while (!pageRankVertexProgram.terminate(memory)) {
- g = GraphComputerRDD.of((JavaRDD) sc.objectFile("/tmp/test"));
g = g.execute(vertexProgram, memory);
- g = g.completeIteration();
+ g.foreachPartition(iterator -> doNothing());
memory.incrIteration();
- FileUtils.deleteDirectory(new File("/tmp/test"));
- g.saveAsObjectFile("/tmp/test");
-
}
g.foreach(t -> System.out.println(t._2().vertex.property(PageRankVertexProgram.PAGE_RANK) + "-->" + t._2().vertex.value("name")));
System.out.println(g.count());
}
+ private static final void doNothing() {
+ }
+
@Override
public GraphComputer isolation(final Isolation isolation) {