You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/05 18:25:43 UTC
incubator-tinkerpop git commit: smart about caching RDDs in
SparkGraphComputer and now you don't need to read the RDD off of disk for
each MapReduce job. Tests run significantly faster.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 5be107e05 -> 0b4c93add
smart about caching RDDs in SparkGraphComputer and now you don't need to read the RDD off of disk for each MapReduce job. Tests run significantly faster.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0b4c93ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0b4c93ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0b4c93ad
Branch: refs/heads/master
Commit: 0b4c93addaa407413452addd3df287f20e715fcf
Parents: 5be107e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Mar 5 10:25:33 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Mar 5 10:25:42 2015 -0700
----------------------------------------------------------------------
.../computer/spark/SparkGraphComputer.java | 91 ++++++++------------
.../process/computer/spark/SparkVertex.java | 12 +--
.../computer/spark/util/SparkHelper.java | 8 +-
3 files changed, 48 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0b4c93ad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 4ee61cb..92b60ab 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util.SApacheConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util.SparkHelper;
@@ -120,28 +121,29 @@ public final class SparkGraphComputer implements GraphComputer {
final long startTime = System.currentTimeMillis();
SparkMemory memory = null;
SparkHelper.deleteOutputDirectory(hadoopConfiguration);
- ////////////////////////////////
- // process the vertex program //
- ////////////////////////////////
- if (null != this.vertexProgram) {
- // set up the spark job
- final SparkConf sparkConfiguration = new SparkConf();
- sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram);
- hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
- if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
- hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
-
- // execute the vertex program
- try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
- // add the project jars to the cluster
- SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
- // create a message-passing friendly rdd from the hadoop input format
- JavaPairRDD<Object, SparkPayload<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
- (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
- NullWritable.class,
- VertexWritable.class)
- .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
+ // wire up a spark context
+ final SparkConf sparkConfiguration = new SparkConf();
+ sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram + "[" + this.mapReducers + "]");
+ hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+ if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
+ hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
+ // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
+ try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+ // add the project jars to the cluster
+ SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
+
+ // create a message-passing friendly rdd from the hadoop input format
+ JavaPairRDD<Object, SparkPayload<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+ (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+ NullWritable.class,
+ VertexWritable.class)
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
+
+ ////////////////////////////////
+ // process the vertex program //
+ ////////////////////////////////
+ if (null != this.vertexProgram) {
// set up the vertex program and wire up configurations
memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
this.vertexProgram.setup(memory); // TODO: setup variables are not being broadcasted on first call (force using broadcast or dummy reduce that has a memory reference?)
@@ -162,50 +164,33 @@ public final class SparkGraphComputer implements GraphComputer {
} else
memory.incrIteration();
}
-
// write the output graph back to disk
- SparkHelper.saveVertexProgramRDD(graphRDD, hadoopConfiguration);
+ SparkHelper.saveGraphRDD(graphRDD, hadoopConfiguration);
}
- }
-
- //////////////////////////////
- // process the map reducers //
- //////////////////////////////
- final Memory.Admin finalMemory = null == memory ? new DefaultMemory() : new DefaultMemory(memory);
- for (final MapReduce mapReduce : this.mapReducers) {
- // set up the spark job
- final SparkConf sparkConfiguration = new SparkConf();
- sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + mapReduce);
- hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
- if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
- hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, null == this.vertexProgram ?
- hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION) : // if no vertex program grab the graph from the input location
- hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
-
- // execute the map reduce job
- try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
- SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
- final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
- (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
- NullWritable.class,
- VertexWritable.class);
+ graphRDD = graphRDD.cache();
+ //////////////////////////////
+ // process the map reducers //
+ //////////////////////////////
+ final Memory.Admin finalMemory = null == memory ? new DefaultMemory() : new DefaultMemory(memory);
+ for (final MapReduce mapReduce : this.mapReducers) {
+ // execute the map reduce job
final SApacheConfiguration newApacheConfiguration = new SApacheConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
- final JavaPairRDD mapRDD = SparkHelper.executeMap(hadoopGraphRDD, mapReduce, newApacheConfiguration);
- // combine todo
+ final JavaPairRDD mapRDD = SparkHelper.executeMap(graphRDD, mapReduce, newApacheConfiguration);
+ // combine TODO? is this really needed
// reduce
final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
-
// write the map reduce output back to disk (memory)
SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
}
+ // close the context or else bad things happen
+ sparkContext.close();
+ // update runtime and return the newly computed graph
+ finalMemory.setRuntime(System.currentTimeMillis() - startTime);
+ return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph), finalMemory.asImmutable());
}
-
- // update runtime and return the newly computed graph
- finalMemory.setRuntime(System.currentTimeMillis() - startTime);
- return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph), finalMemory.asImmutable());
}
);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0b4c93ad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
index c7bd37a..905aa07 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
@@ -45,16 +45,16 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
private static GryoWriter GRYO_WRITER = GryoWriter.build().create();
private static GryoReader GRYO_READER = GryoReader.build().create();
- private static final String VERTEX_ID = Graph.Hidden.hide("giraph.gremlin.vertexId");
// TODO: Wrapped vertex -- need VertexProgram in partition (broadcast variable?)
+ private final Object vertexId;
private transient TinkerVertex vertex;
private byte[] vertexBytes;
public SparkVertex(final TinkerVertex vertex) {
this.vertex = vertex;
- this.vertex.graph().variables().set(VERTEX_ID, this.vertex.id());
+ this.vertexId = vertex.id();
}
@Override
@@ -64,7 +64,7 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
@Override
public Object id() {
- return this.vertex.id();
+ return this.vertexId;
}
@Override
@@ -114,7 +114,7 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
@Override
public int hashCode() {
- return ElementHelper.hashCode(this);
+ return this.vertexId.hashCode();
}
@Override
@@ -134,7 +134,7 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
this.inflateVertex();
}
- private final void inflateVertex() {
+ public final void inflateVertex() {
if (null != this.vertex)
return;
@@ -144,7 +144,7 @@ public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable
GRYO_READER.readGraph(bis, tinkerGraph);
bis.close();
this.vertexBytes = null;
- this.vertex = (TinkerVertex) tinkerGraph.iterators().vertexIterator(tinkerGraph.variables().get(VERTEX_ID).get()).next();
+ this.vertex = (TinkerVertex) tinkerGraph.iterators().vertexIterator(this.vertexId).next();
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0b4c93ad/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index c7c7dc6..3661a9e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -104,11 +104,11 @@ public final class SparkHelper {
return current;
}
- public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD, final MapReduce<K, V, ?, ?, ?> globalMapReduce, final Configuration apacheConfiguration) {
- JavaPairRDD<K, V> mapRDD = hadoopGraphRDD.mapPartitionsToPair(partitionIterator -> {
+ public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> globalMapReduce, final Configuration apacheConfiguration) {
+ JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
- partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().get(), mapEmitter));
+ partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().asVertexPayload().getVertex(), mapEmitter));
return mapEmitter.getEmissions();
});
if (globalMapReduce.getMapKeySort().isPresent())
@@ -141,7 +141,7 @@ public final class SparkHelper {
}
}
- public static <M> void saveVertexProgramRDD(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ public static <M> void saveGraphRDD(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a <nullwritable,vertexwritable> stream for output