You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/27 23:28:48 UTC
incubator-tinkerpop git commit: organization tweaks around
persistence of RDDs. Minor nothings.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 64227b8a3 -> f8a252202
organization tweaks around persistence of RDDs. Minor nothings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f8a25220
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f8a25220
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f8a25220
Branch: refs/heads/master
Commit: f8a252202eb840c13ae96aa30a797c050c8d3624
Parents: 64227b8
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 27 15:28:38 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 27 15:28:38 2016 -0700
----------------------------------------------------------------------
.../process/computer/SparkGraphComputer.java | 55 ++++++++++----------
.../computer/SparkHadoopGraphProvider.java | 6 ++-
2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f8a25220/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 818b9a9..59da7d6 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -161,21 +161,21 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
Spark.create(sparkContext.sc()); // this is the context RDD holder that prevents GC
updateLocalConfiguration(sparkContext, sparkConfiguration);
// create a message-passing friendly rdd from the input rdd
- JavaPairRDD<Object, VertexWritable> graphRDD;
- JavaPairRDD<Object, VertexWritable> finalGraphRDD = null;
+ JavaPairRDD<Object, VertexWritable> loadedGraphRDD;
+ JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
try {
- graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+ loadedGraphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
.newInstance()
.readGraphRDD(apacheConfiguration, sparkContext);
if (this.workersSet) {
- if (graphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
- graphRDD = graphRDD.coalesce(this.workers);
- else if (graphRDD.partitions().size() < this.workers) // ensures that the graphRDD does not have less partitions than workers
- graphRDD = graphRDD.repartition(this.workers);
+ if (loadedGraphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
+ loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
+ else if (loadedGraphRDD.partitions().size() < this.workers) // ensures that the graphRDD does not have less partitions than workers
+ loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
}
// persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
if (!inputFromSpark)
- graphRDD = graphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+ loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
} catch (final InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -197,7 +197,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// execute the vertex program
while (true) {
memory.setInTask(true);
- viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+ viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
memory.setInTask(false);
if (this.vertexProgram.terminate(memory))
break;
@@ -206,24 +206,24 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
memory.broadcastMemory(sparkContext);
}
}
- // write the graph rdd using the output rdd
+ // write the computed graph to the respective output (rdd or output format)
final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
- finalGraphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys);
+ computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, elementComputeKeys);
if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) && !this.persist.equals(Persist.NOTHING)) {
try {
hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
.newInstance()
- .writeGraphRDD(apacheConfiguration, finalGraphRDD);
+ .writeGraphRDD(apacheConfiguration, computedGraphRDD);
} catch (final InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
- final boolean finalGraphComputed = finalGraphRDD != null;
- if (!finalGraphComputed)
- finalGraphRDD = graphRDD;
+ final boolean computedGraphCreated = computedGraphRDD != null;
+ if (!computedGraphCreated)
+ computedGraphRDD = loadedGraphRDD;
final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
@@ -231,25 +231,24 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// process the map reducers //
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
- finalGraphRDD = finalGraphRDD.mapValues(vertexWritable -> {
+ computedGraphRDD = computedGraphRDD.mapValues(vertexWritable -> {
vertexWritable.get().dropEdges();
return vertexWritable;
});
- if (!outputToSpark)
- finalGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY"))); // drop all the edges of the graph as they are not used in mapReduce processing
- if (!inputFromSpark && finalGraphComputed) // if there was a final graph computed (that is, a vertex program executed), then drop the loaded graph
- graphRDD.unpersist();
+ if (!outputToSpark && computedGraphCreated) // if the computed graph wasn't already persisted to spark, persist it here
+ computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY"))); // drop all the edges of the graph as they are not used in mapReduce processing
+
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
- final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) finalGraphRDD, mapReduce, newApacheConfiguration);
+ final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) computedGraphRDD, mapReduce, newApacheConfiguration);
// combine
final JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, newApacheConfiguration) : mapRDD;
// reduce
final JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : combineRDD;
- // write the map reduce output back to disk (memory)
+ // write the map reduce output back to disk and computer result memory
try {
mapReduce.addResultToMemory(finalMemory,
hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
@@ -261,13 +260,13 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
}
- // unpersist the loaded graphRDD as it will never be used again
- if (!inputFromSpark && finalGraphComputed)
- graphRDD.unpersist();
- // unpersist the final computed graphRDD if it will not be used again (no PersistedOutputRDD)
+ // unpersist the loaded graph if it will not be used again (no PersistedInputRDD)
+ if (!inputFromSpark && computedGraphCreated)
+ loadedGraphRDD.unpersist();
+ // unpersist the computed graph if it will not be used again (no PersistedOutputRDD)
if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))
- finalGraphRDD.unpersist();
- // delete any file system output if persist nothing
+ computedGraphRDD.unpersist();
+ // delete any file system or rdd data if persist nothing
if (null != outputLocation && this.persist.equals(GraphComputer.Persist.NOTHING)) {
if (outputToHDFS)
fileSystemStorage.rm(outputLocation);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f8a25220/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index e2d77c4..19085da 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -25,8 +25,8 @@ import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -64,6 +64,8 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider {
@Override
public GraphTraversalSource traversal(final Graph graph) {
- return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class).workers(new Random().nextInt(3)+1)).create(graph);
+ return RANDOM.nextBoolean() ?
+ GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class).workers(RANDOM.nextInt(3) + 1)).create(graph) :
+ GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph);
}
}
\ No newline at end of file