You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/28 00:02:06 UTC
incubator-tinkerpop git commit: more test cases around RDD
persistence.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master f8a252202 -> 1e5b8db3d
more test cases around RDD persistence.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/1e5b8db3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/1e5b8db3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/1e5b8db3
Branch: refs/heads/master
Commit: 1e5b8db3d4b0656d294c2fa0cebc4aa658970fb6
Parents: f8a2522
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 27 16:02:00 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 27 16:02:00 2016 -0700
----------------------------------------------------------------------
.../gremlin/spark/process/computer/SparkGraphComputer.java | 7 +++++--
.../spark/structure/io/PersistedInputOutputRDDTest.java | 5 +++--
2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1e5b8db3/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 59da7d6..0419fc3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -231,12 +231,15 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// process the map reducers //
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
+ // drop all the edges of the graph as they are not used in mapReduce processing
computedGraphRDD = computedGraphRDD.mapValues(vertexWritable -> {
vertexWritable.get().dropEdges();
return vertexWritable;
});
- if (!outputToSpark && computedGraphCreated) // if the computed graph wasn't already persisted to spark, persist it here
- computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY"))); // drop all the edges of the graph as they are not used in mapReduce processing
+ // if the computed graph wasn't already persisted, persist it here for all the MapReduce jobs to reuse
+ // however, if there is only one MapReduce to execute, don't bother wasting the clock cycles.
+ if (!outputToSpark && computedGraphCreated && this.mapReducers.size() > 1)
+ computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1e5b8db3/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 6dac97f..93d83b9 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankMapReduce;
import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -165,7 +166,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
.program(TraversalVertexProgram.build()
.traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
"gremlin-groovy",
- "g.V()").create(graph)).submit().get();
+ "g.V().count()").create(graph)).submit().get();
////////
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
///////
@@ -309,7 +310,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2);
////
graph = GraphFactory.open(configuration);
- graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
+ graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).mapReduce(PageRankMapReduce.build().create()).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
g = graph.traversal();
assertEquals(6l, g.V().count().next().longValue());
assertEquals(6l, g.E().count().next().longValue());