You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 15:42:01 UTC

[06/20] incubator-tinkerpop git commit: Some really cool lazy optimizations to SparkGraphComputer. I 'get it' now. Easy peasy lemon squeezy.

Some really cool lazy optimizations to SparkGraphComputer. I 'get it' now. Easy peasy lemon squeezy.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/84be267a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/84be267a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/84be267a

Branch: refs/heads/master
Commit: 84be267aaf4a3ec2c3700af4c10586212cf894c8
Parents: 1020fd2
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Mar 2 19:23:12 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Mar 2 19:23:12 2015 -0700

----------------------------------------------------------------------
 .../computer/spark/GraphComputerRDD.java        | 39 +++++++++-----------
 .../computer/spark/SparkGraphComputer.java      | 14 ++-----
 2 files changed, 22 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/84be267a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
index c99b108..abf0ac6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
@@ -25,6 +25,7 @@ import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.function.FlatMapFunction2;
 import org.apache.spark.rdd.RDD;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.Tuple2;
 import scala.reflect.ManifestFactory;
 
@@ -45,20 +46,30 @@ public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>>
         super(rdd.rdd(), ManifestFactory.classType(Object.class), ManifestFactory.classType(SparkMessenger.class));
     }
 
-    public GraphComputerRDD completeIteration() {
+    public GraphComputerRDD execute(final Configuration configuration, final SparkMemory memory) {
         JavaPairRDD<Object, SparkMessenger<M>> current = this;
-        // clear all previous incoming messages
-        current = current.mapValues(messenger -> {
-            messenger.clearIncomingMessages();
-            return messenger;
+        // execute vertex program
+        current = current.mapPartitionsToPair(iterator -> {
+            final VertexProgram<M> vertexProgram = VertexProgram.createVertexProgram(configuration);
+            return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(iterator, tuple -> {
+                vertexProgram.execute(tuple._2().vertex, tuple._2(), memory);
+                return tuple;
+            });
         });
+        // clear all previous incoming messages
+        if(!memory.isInitialIteration()) {
+            current = current.mapValues(messenger -> {
+                messenger.clearIncomingMessages();
+                return messenger;
+            });
+        }
         // emit messages
         current = current.<Object, SparkMessenger<M>>flatMapToPair(tuple -> {
             final List<Tuple2<Object, SparkMessenger<M>>> list = tuple._2().outgoing.entrySet()
                     .stream()
                     .map(entry -> new Tuple2<>(entry.getKey(), new SparkMessenger<>(new ToyVertex(entry.getKey()), entry.getValue())))
-                    .collect(Collectors.toList());
-            list.add(new Tuple2<>(tuple._1(), tuple._2()));
+                    .collect(Collectors.toList());          // the message vertices
+            list.add(new Tuple2<>(tuple._1(), tuple._2())); // the raw vertex
             return list;
         });
         // "message pass" via reduction
@@ -73,20 +84,6 @@ public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>>
             messenger.clearOutgoingMessages();
             return messenger;
         });
-        current.count(); // TODO: necessary for BSP?
-        return GraphComputerRDD.of(current);
-    }
-
-    private static void doNothing() {
-
-    }
-
-    public GraphComputerRDD execute(final Configuration configuration, final SparkMemory memory) {
-        JavaPairRDD<Object, SparkMessenger<M>> current = this;
-        current = current.mapValues(messenger -> {
-            VertexProgram.createVertexProgram(configuration).execute(messenger.vertex, messenger, memory);
-            return messenger;
-        });
         return GraphComputerRDD.of(current);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/84be267a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 7478998..107f1bc 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -18,12 +18,10 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -41,7 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -81,8 +78,6 @@ public class SparkGraphComputer implements GraphComputer {
         JavaPairRDD<Object, SparkMessenger<Double>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
 
         GraphComputerRDD<Double> g = GraphComputerRDD.of(rdd2);
-        FileUtils.deleteDirectory(new File("/tmp/test"));
-        g.saveAsObjectFile("/tmp/test");
 
         final org.apache.commons.configuration.Configuration vertexProgram = new SerializableConfiguration();
         final PageRankVertexProgram pageRankVertexProgram = PageRankVertexProgram.build().create();
@@ -90,18 +85,17 @@ public class SparkGraphComputer implements GraphComputer {
         final SparkMemory memory = new SparkMemory(Collections.emptySet());
 
         while (!pageRankVertexProgram.terminate(memory)) {
-            g = GraphComputerRDD.of((JavaRDD) sc.objectFile("/tmp/test"));
             g = g.execute(vertexProgram, memory);
-            g = g.completeIteration();
+            g.foreachPartition(iterator -> doNothing());
             memory.incrIteration();
-            FileUtils.deleteDirectory(new File("/tmp/test"));
-            g.saveAsObjectFile("/tmp/test");
-
         }
         g.foreach(t -> System.out.println(t._2().vertex.property(PageRankVertexProgram.PAGE_RANK) + "-->" + t._2().vertex.value("name")));
         System.out.println(g.count());
     }
 
+    private static final void doNothing() {
+    }
+
 
     @Override
     public GraphComputer isolation(final Isolation isolation) {