You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/05 18:25:43 UTC

incubator-tinkerpop git commit: smart about caching RDDs in SparkGraphComputer and now you don't need to read the RDD off of disk for each MapReduce job. Tests run significantly faster.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 5be107e05 -> 0b4c93add


smart about caching RDDs in SparkGraphComputer and now you don't need to read the RDD off of disk for each MapReduce job. Tests run significantly faster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0b4c93ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0b4c93ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0b4c93ad

Branch: refs/heads/master
Commit: 0b4c93addaa407413452addd3df287f20e715fcf
Parents: 5be107e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Mar 5 10:25:33 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Mar 5 10:25:42 2015 -0700

----------------------------------------------------------------------
 .../computer/spark/SparkGraphComputer.java      | 91 ++++++++------------
 .../process/computer/spark/SparkVertex.java     | 12 +--
 .../computer/spark/util/SparkHelper.java        |  8 +-
 3 files changed, 48 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0b4c93ad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 4ee61cb..92b60ab 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util.SApacheConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util.SparkHelper;
@@ -120,28 +121,29 @@ public final class SparkGraphComputer implements GraphComputer {
                     final long startTime = System.currentTimeMillis();
                     SparkMemory memory = null;
                     SparkHelper.deleteOutputDirectory(hadoopConfiguration);
-                    ////////////////////////////////
-                    // process the vertex program //
-                    ////////////////////////////////
-                    if (null != this.vertexProgram) {
-                        // set up the spark job
-                        final SparkConf sparkConfiguration = new SparkConf();
-                        sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram);
-                        hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-                        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
-                            hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
-
-                        // execute the vertex program
-                        try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
-                            // add the project jars to the cluster
-                            SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
-                            // create a message-passing friendly rdd from the hadoop input format
-                            JavaPairRDD<Object, SparkPayload<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
-                                    (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
-                                    NullWritable.class,
-                                    VertexWritable.class)
-                                    .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
 
+                    // wire up a spark context
+                    final SparkConf sparkConfiguration = new SparkConf();
+                    sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram + "[" + this.mapReducers + "]");
+                    hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+                    if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
+                        hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
+                    // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
+                    try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+                        // add the project jars to the cluster
+                        SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
+
+                        // create a message-passing friendly rdd from the hadoop input format
+                        JavaPairRDD<Object, SparkPayload<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+                                NullWritable.class,
+                                VertexWritable.class)
+                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
+
+                        ////////////////////////////////
+                        // process the vertex program //
+                        ////////////////////////////////
+                        if (null != this.vertexProgram) {
                             // set up the vertex program and wire up configurations
                             memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                             this.vertexProgram.setup(memory); // TODO: setup variables are not being broadcasted on first call (force using broadcast or dummy reduce that has a memory reference?)
@@ -162,50 +164,33 @@ public final class SparkGraphComputer implements GraphComputer {
                                 } else
                                     memory.incrIteration();
                             }
-
                             // write the output graph back to disk
-                            SparkHelper.saveVertexProgramRDD(graphRDD, hadoopConfiguration);
+                            SparkHelper.saveGraphRDD(graphRDD, hadoopConfiguration);
                         }
-                    }
-
-                    //////////////////////////////
-                    // process the map reducers //
-                    //////////////////////////////
-                    final Memory.Admin finalMemory = null == memory ? new DefaultMemory() : new DefaultMemory(memory);
-                    for (final MapReduce mapReduce : this.mapReducers) {
-                        // set up the spark job
-                        final SparkConf sparkConfiguration = new SparkConf();
-                        sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + mapReduce);
-                        hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
-                        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
-                            hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, null == this.vertexProgram ?
-                                    hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION) : // if no vertex program grab the graph from the input location
-                                    hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
-
-                        // execute the map reduce job
-                        try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
-                            SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
-                            final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
-                                    (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
-                                    NullWritable.class,
-                                    VertexWritable.class);
 
+                        graphRDD = graphRDD.cache();
+                        //////////////////////////////
+                        // process the map reducers //
+                        //////////////////////////////
+                        final Memory.Admin finalMemory = null == memory ? new DefaultMemory() : new DefaultMemory(memory);
+                        for (final MapReduce mapReduce : this.mapReducers) {
+                            // execute the map reduce job
                             final SApacheConfiguration newApacheConfiguration = new SApacheConfiguration(apacheConfiguration);
                             mapReduce.storeState(newApacheConfiguration);
                             // map
-                            final JavaPairRDD mapRDD = SparkHelper.executeMap(hadoopGraphRDD, mapReduce, newApacheConfiguration);
-                            // combine todo
+                            final JavaPairRDD mapRDD = SparkHelper.executeMap(graphRDD, mapReduce, newApacheConfiguration);
+                            // combine TODO? is this really needed
                             // reduce
                             final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
-
                             // write the map reduce output back to disk (memory)
                             SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                         }
+                        // close the context or else bad things happen
+                        sparkContext.close();
+                        // update runtime and return the newly computed graph
+                        finalMemory.setRuntime(System.currentTimeMillis() - startTime);
+                        return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph), finalMemory.asImmutable());
                     }
-
-                    // update runtime and return the newly computed graph
-                    finalMemory.setRuntime(System.currentTimeMillis() - startTime);
-                    return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph), finalMemory.asImmutable());
                 }
         );
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0b4c93ad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
index c7bd37a..905aa07 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
@@ -45,16 +45,16 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
 
     private static GryoWriter GRYO_WRITER = GryoWriter.build().create();
     private static GryoReader GRYO_READER = GryoReader.build().create();
-    private static final String VERTEX_ID = Graph.Hidden.hide("giraph.gremlin.vertexId");
 
     // TODO: Wrapped vertex -- need VertexProgram in partition (broadcast variable?)
 
+    private final Object vertexId;
     private transient TinkerVertex vertex;
     private byte[] vertexBytes;
 
     public SparkVertex(final TinkerVertex vertex) {
         this.vertex = vertex;
-        this.vertex.graph().variables().set(VERTEX_ID, this.vertex.id());
+        this.vertexId = vertex.id();
     }
 
     @Override
@@ -64,7 +64,7 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
 
     @Override
     public Object id() {
-        return this.vertex.id();
+        return this.vertexId;
     }
 
     @Override
@@ -114,7 +114,7 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
 
     @Override
     public int hashCode() {
-        return ElementHelper.hashCode(this);
+        return this.vertexId.hashCode();
     }
 
     @Override
@@ -134,7 +134,7 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
         this.inflateVertex();
     }
 
-    private final void inflateVertex() {
+    public final void inflateVertex() {
         if (null != this.vertex)
             return;
 
@@ -144,7 +144,7 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
             GRYO_READER.readGraph(bis, tinkerGraph);
             bis.close();
             this.vertexBytes = null;
-            this.vertex = (TinkerVertex) tinkerGraph.iterators().vertexIterator(tinkerGraph.variables().get(VERTEX_ID).get()).next();
+            this.vertex = (TinkerVertex) tinkerGraph.iterators().vertexIterator(this.vertexId).next();
         } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0b4c93ad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index c7c7dc6..3661a9e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -104,11 +104,11 @@ public final class SparkHelper {
         return current;
     }
 
-    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD, final MapReduce<K, V, ?, ?, ?> globalMapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<K, V> mapRDD = hadoopGraphRDD.mapPartitionsToPair(partitionIterator -> {
+    public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> globalMapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
             final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
-            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().get(), mapEmitter));
+            partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().asVertexPayload().getVertex(), mapEmitter));
             return mapEmitter.getEmissions();
         });
         if (globalMapReduce.getMapKeySort().isPresent())
@@ -141,7 +141,7 @@ public final class SparkHelper {
         }
     }
 
-    public static <M> void saveVertexProgramRDD(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+    public static <M> void saveGraphRDD(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
         final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
         if (null != outputLocation) {
             // map back to a <nullwritable,vertexwritable> stream for output