You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/27 18:58:15 UTC
incubator-tinkerpop git commit: found an RDD persistence 'memory bug'
in SparkGraphComputer while doing large scale testing with @dkuppitz using 4
Blades and the 2 billion edge Friendster dataset. Fixed. CTR.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 4dd89e914 -> e5e7cbaa9
found an RDD persistence 'memory bug' in SparkGraphComputer while doing large scale testing with @dkuppitz using 4 Blades and the 2 billion edge Friendster dataset. Fixed. CTR.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e5e7cbaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e5e7cbaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e5e7cbaa
Branch: refs/heads/master
Commit: e5e7cbaa91863e45392ce994782f6855dc680e5e
Parents: 4dd89e9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 27 10:57:59 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 27 10:58:10 2016 -0700
----------------------------------------------------------------------
.../spark/process/computer/SparkExecutor.java | 7 ++---
.../process/computer/SparkGraphComputer.java | 29 ++++++++++++++------
.../computer/SparkHadoopGraphProvider.java | 2 +-
3 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e5e7cbaa/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 5889bdb..f1734c8 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -133,9 +133,9 @@ public final class SparkExecutor {
return newViewIncomingRDD;
}
- public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys, final boolean unpersistInput) {
+ public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
// attach the final computed view to the cached graph
- final JavaPairRDD<Object, VertexWritable> finalGraphRDD = graphRDD.leftOuterJoin(viewIncomingRDD)
+ return graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
vertex.dropVertexProperties(elementComputeKeys);
@@ -144,9 +144,6 @@ public final class SparkExecutor {
view.clear(); // no longer needed so kill it from memory
return tuple._1();
});
- if (unpersistInput)
- graphRDD.unpersist();
- return finalGraphRDD;
}
/////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e5e7cbaa/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 58a0af6..60da1d7 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -162,6 +162,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
updateLocalConfiguration(sparkContext, sparkConfiguration);
// create a message-passing friendly rdd from the input rdd
JavaPairRDD<Object, VertexWritable> graphRDD;
+ JavaPairRDD<Object, VertexWritable> finalGraphRDD = null;
try {
graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
.newInstance()
@@ -207,36 +208,43 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
// write the graph rdd using the output rdd
final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
- graphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys, !inputFromSpark);
+ finalGraphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys);
if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
- hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) &&
- !this.persist.equals(GraphComputer.Persist.NOTHING)) {
+ hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) && !this.persist.equals(Persist.NOTHING)) {
try {
hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
.newInstance()
- .writeGraphRDD(apacheConfiguration, graphRDD);
+ .writeGraphRDD(apacheConfiguration, finalGraphRDD);
} catch (final InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
+ final boolean finalGraphComputed = finalGraphRDD != null;
+ if (!finalGraphComputed)
+ finalGraphRDD = graphRDD;
+
final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
//////////////////////////////
// process the map reducers //
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
- final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = graphRDD.mapValues(vertexWritable -> {
+ finalGraphRDD = finalGraphRDD.mapValues(vertexWritable -> {
vertexWritable.get().dropEdges();
return vertexWritable;
- }); // drop all the edges of the graph as they are not used in mapReduce processing
+ });
+ if (!outputToSpark)
+ finalGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY"))); // drop all the edges of the graph as they are not used in mapReduce processing
+ if (!inputFromSpark && finalGraphComputed) // if there was a final graph computed (that is, a vertex program executed), then drop the loaded graph
+ graphRDD.unpersist();
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
- final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
+ final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) finalGraphRDD, mapReduce, newApacheConfiguration);
// combine
final JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, newApacheConfiguration) : mapRDD;
// reduce
@@ -253,9 +261,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
}
- // unpersist the graphRDD if it will no longer be used
- if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))
+ // unpersist the loaded graphRDD as it will never be used again
+ if (!inputFromSpark && finalGraphComputed)
graphRDD.unpersist();
+ // unpersist the final computed graphRDD if it will not be used again (no PersistedOutputRDD)
+ if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))
+ finalGraphRDD.unpersist();
// delete any file system output if persist nothing
if (null != outputLocation && this.persist.equals(GraphComputer.Persist.NOTHING)) {
if (outputToHDFS)
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e5e7cbaa/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 108d0ed..e2d77c4 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -64,6 +64,6 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider {
@Override
public GraphTraversalSource traversal(final Graph graph) {
- return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph);
+ return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class).workers(new Random().nextInt(3)+1)).create(graph);
}
}
\ No newline at end of file