You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/26 22:29:18 UTC

incubator-tinkerpop git commit: got back to where I was around 10am this morning. Lazy iterators even though the Spark JavaAPI says Iterable.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 53ebfead8 -> 774a87e04


got back to where I was around 10am this morning. Lazy iterators even though the Spark JavaAPI says Iterable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/774a87e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/774a87e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/774a87e0

Branch: refs/heads/master
Commit: 774a87e04d969518df554d757126afc8e4899e62
Parents: 53ebfea
Author: okram <ok...@apache.org>
Authored: Thu Mar 26 15:29:02 2015 -0600
Committer: okram <ok...@apache.org>
Committed: Thu Mar 26 15:29:14 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/SparkExecutor.java   | 87 +++++++++-----------
 .../computer/spark/SparkGraphComputer.java      | 16 ++--
 2 files changed, 44 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/774a87e0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index d9a2537..44ac16b 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -26,12 +26,6 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMessagePayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkReduceEmitter;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkVertexPayload;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
@@ -41,11 +35,10 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.Tuple2;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Set;
 
 /**
@@ -56,6 +49,10 @@ public final class SparkExecutor {
     private SparkExecutor() {
     }
 
+    private static final void doNothing() {
+        // a cheap action
+    }
+
     public static <M> JavaPairRDD<Object, SparkPayload<M>> executeStep(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
         JavaPairRDD<Object, SparkPayload<M>> current = graphRDD;
         // execute vertex program
@@ -63,52 +60,38 @@ public final class SparkExecutor {
             final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
             final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
             workerVertexProgram.workerIterationStart(memory);
-            final List<Tuple2<Object, SparkPayload<M>>> emission = new ArrayList<>();
-            partitionIterator.forEachRemaining(keyValue -> {
-                keyValue._2().asVertexPayload().getOutgoingMessages().clear();
-                workerVertexProgram.execute(ComputerGraph.of(keyValue._2().asVertexPayload().getVertex(), elementComputeKeys), keyValue._2().asVertexPayload(), memory);
-                emission.add(keyValue);
+            return () -> IteratorUtils.map(partitionIterator, vertex -> {
+                vertex._2().asVertexPayload().getOutgoingMessages().clear(); // there should be no outgoing messages at this point
+                workerVertexProgram.execute(ComputerGraph.of(vertex._2().asVertexPayload().getVertex(), elementComputeKeys), vertex._2().asVertexPayload(), memory);
+                if (!partitionIterator.hasNext())
+                    workerVertexProgram.workerIterationEnd(memory);
+                vertex._2().getMessages().clear(); // there should be no incoming messages at this point (only outgoing messages)
+                return vertex;
             });
-            workerVertexProgram.workerIterationEnd(memory);
-            return emission;
         });
 
         // emit messages by appending them to the graph as message payloads
-        current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> {
-            keyValue._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
-            final List<Tuple2<Object, SparkPayload<M>>> list = new ArrayList<>();
-            list.add(keyValue);    // this is a vertex
-            keyValue._2().asVertexPayload().getOutgoingMessages().forEach(message -> list.add(new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2())))); // this is a message
-            return list;
+        current = current.<Object, SparkPayload<M>>flatMapToPair(vertex -> {
+            vertex._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
+            return () -> IteratorUtils.concat(
+                    IteratorUtils.of(vertex),
+                    IteratorUtils.map(vertex._2().asVertexPayload().getOutgoingMessages().iterator(),
+                            message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2()))));
+
         });
 
         // "message pass" by merging the message payloads with the vertex payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
         current = current.reduceByKey((payloadA, payloadB) -> {
             if (payloadA.isVertex()) {
-                final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadA.asVertexPayload().getVertex());
-                vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
-                vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
-                return vertexPayload;
-            } else if (payloadB.isVertex()) {
-                final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadB.asVertexPayload().getVertex());
-                vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
-                vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
-                return vertexPayload;
+                payloadA.addMessages(payloadB.getMessages(), messageCombiner);
+                return payloadA;
             } else {
-                final SparkMessagePayload<M> messagePayload = new SparkMessagePayload<>();
-                messagePayload.addMessages(payloadA.getMessages(), messageCombiner);
-                messagePayload.addMessages(payloadB.getMessages(), messageCombiner);
-                return messagePayload;
+                payloadB.addMessages(payloadA.getMessages(), messageCombiner);
+                return payloadB;
             }
         });
-
-        // clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
-        current = current.mapValues(vertexPayload -> {
-            vertexPayload.asVertexPayload().getOutgoingMessages().clear();
-            return vertexPayload;
-        });
-
+        current.foreach(vertex -> doNothing()); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
         return current;
     }
 
@@ -116,10 +99,13 @@ public final class SparkExecutor {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
             workerMapReduce.workerStart(MapReduce.Stage.MAP);
-            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
-            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().getVertex(), mapEmitter));
-            workerMapReduce.workerEnd(MapReduce.Stage.MAP);
-            return mapEmitter.getEmissions();
+            return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
+                final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+                workerMapReduce.map(keyValue._2().getVertex(), mapEmitter);
+                if (!partitionIterator.hasNext())
+                    workerMapReduce.workerEnd(MapReduce.Stage.MAP);
+                return mapEmitter.getEmissions().iterator();
+            });
         });
         if (mapReduce.getMapKeySort().isPresent())
             mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
@@ -132,10 +118,13 @@ public final class SparkExecutor {
         JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
             final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
             workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
-            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
-            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
-            workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
-            return reduceEmitter.getEmissions();
+            return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
+                final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+                workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
+                if (!partitionIterator.hasNext())
+                    workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
+                return reduceEmitter.getEmissions().iterator();
+            });
         });
         if (mapReduce.getReduceKeySort().isPresent())
             reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/774a87e0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index f43727f..733f2a2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -183,7 +183,6 @@ public final class SparkGraphComputer implements GraphComputer {
                             while (true) {
                                 memory.setInTask(true);
                                 graphRDD = SparkExecutor.executeStep(graphRDD, memory, vertexProgramConfiguration);
-                                graphRDD.foreachPartition(iterator -> doNothing()); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
                                 memory.setInTask(false);
                                 if (this.vertexProgram.terminate(memory))
                                     break;
@@ -203,12 +202,13 @@ public final class SparkGraphComputer implements GraphComputer {
                         // process the map reducers //
                         //////////////////////////////
                         if (!this.mapReducers.isEmpty()) {
-                            // drop all edges in the graphRDD as edges are not needed in the map reduce jobs
-                            graphRDD = graphRDD.mapToPair(tuple -> {
-                                tuple._2().asVertexPayload().getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
-                                return tuple;
+                            // drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
+                            graphRDD = graphRDD.mapValues(vertex -> {
+                                vertex.getMessages().clear();
+                                vertex.asVertexPayload().getOutgoingMessages().clear();
+                                vertex.asVertexPayload().getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
+                                return vertex;
                             });
-                            graphRDD = graphRDD.cache();
                             for (final MapReduce mapReduce : this.mapReducers) {
                                 // execute the map reduce job
                                 final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
@@ -234,10 +234,6 @@ public final class SparkGraphComputer implements GraphComputer {
 
     /////////////////
 
-    private static final void doNothing() {
-        // a cheap action
-    }
-
     private static void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
         if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
             final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);