You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/28 19:29:23 UTC
incubator-tinkerpop git commit: more test cases around persisted
rdds. found a minor bug when the peristed rdd was an input rdd. tweaked the
Spark Applicaiton name so it looks nicer in the SparkServer UI. CTR.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master aaf2e94bc -> 93b112d40
more test cases around persisted rdds. found a minor bug when the peristed rdd was an input rdd. tweaked the Spark Applicaiton name so it looks nicer in the SparkServer UI. CTR.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/93b112d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/93b112d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/93b112d4
Branch: refs/heads/master
Commit: 93b112d40a42fd47e38ed69e0311a28254475ea7
Parents: aaf2e94
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 28 11:29:12 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 28 11:29:12 2016 -0700
----------------------------------------------------------------------
.../process/computer/SparkGraphComputer.java | 19 +++--
.../gremlin/spark/structure/Spark.java | 4 +-
.../io/PersistedInputOutputRDDTest.java | 86 ++++++++------------
3 files changed, 48 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/93b112d4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 0419fc3..a38db91 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -231,15 +231,16 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// process the map reducers //
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
- // drop all the edges of the graph as they are not used in mapReduce processing
- computedGraphRDD = computedGraphRDD.mapValues(vertexWritable -> {
- vertexWritable.get().dropEdges();
- return vertexWritable;
- });
- // if the computed graph wasn't already persisted, persist it here for all the MapReduce jobs to reuse
- // however, if there is only one MapReduce to execute, don't bother wasting the clock cycles.
- if (!outputToSpark && computedGraphCreated && this.mapReducers.size() > 1)
- computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+ if (computedGraphCreated && !outputToSpark) {
+ // drop all the edges of the graph as they are not used in mapReduce processing
+ computedGraphRDD = computedGraphRDD.mapValues(vertexWritable -> {
+ vertexWritable.get().dropEdges();
+ return vertexWritable;
+ });
+ // if there is only one MapReduce to execute, don't bother wasting the clock cycles.
+ if (this.mapReducers.size() > 1)
+ computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+ }
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/93b112d4/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
index 1c8c41b..08639f4 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
@@ -49,13 +49,13 @@ public class Spark {
public static void create(final Configuration configuration) {
final SparkConf sparkConf = new SparkConf();
configuration.getKeys().forEachRemaining(key -> sparkConf.set(key, configuration.getProperty(key).toString()));
- sparkConf.setAppName("Spark-Gremlin Persisted Context Application");
+ sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
CONTEXT = SparkContext.getOrCreate(sparkConf);
}
public static void create(final String master) {
final SparkConf sparkConf = new SparkConf();
- sparkConf.setAppName("Spark-Gremlin Persisted Context Application");
+ sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
sparkConf.setMaster(master);
CONTEXT = SparkContext.getOrCreate(sparkConf);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/93b112d4/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 93d83b9..8dd677f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -24,7 +24,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
@@ -38,7 +37,6 @@ import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.IoCore;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
@@ -81,20 +79,16 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
public void shouldPersistRDDBasedOnStorageLevel() throws Exception {
Spark.create("local[4]");
int counter = 0;
- for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY", "MEMORY_ONLY_SER", "MEMORY_AND_DISK_SER", "OFF_HEAP")) {
- assertEquals(counter * 2, Spark.getRDDs().size());
- assertEquals(counter * 2, Spark.getContext().getPersistentRDDs().size());
+ for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY", "MEMORY_ONLY_SER", "MEMORY_AND_DISK_SER")) {
+ assertEquals(counter * 3, Spark.getRDDs().size());
+ assertEquals(counter * 3, Spark.getContext().getPersistentRDDs().size());
counter++;
final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ final Configuration configuration = super.getBaseConfiguration();
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel);
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
Graph graph = GraphFactory.open(configuration);
@@ -104,14 +98,16 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
.program(TraversalVertexProgram.build()
.traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
"gremlin-groovy",
- "g.V()").create(graph)).submit().get();
+ "g.V().groupCount('m').by('name').out()").create(graph)).submit().get();
////////
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
- assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getGraphLocation(rddName)).getStorageLevel());
+ assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, "m")));
assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))));
+ assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getGraphLocation(rddName)).getStorageLevel());
+ assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getMemoryLocation(rddName, "m")).getStorageLevel());
assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))).getStorageLevel());
- assertEquals(counter * 2, Spark.getRDDs().size());
- assertEquals(counter * 2, Spark.getContext().getPersistentRDDs().size());
+ assertEquals(counter * 3, Spark.getRDDs().size());
+ assertEquals(counter * 3, Spark.getContext().getPersistentRDDs().size());
//System.out.println(SparkContextStorage.open().ls());
}
Spark.close();
@@ -121,14 +117,10 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
public void shouldNotPersistRDDAcrossJobs() throws Exception {
Spark.create("local[4]");
final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ final Configuration configuration = super.getBaseConfiguration();
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false); // because the spark context is NOT persisted, neither is the RDD
Graph graph = GraphFactory.open(configuration);
@@ -142,6 +134,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
////////
Spark.create("local[4]");
assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+ assertEquals(0, Spark.getContext().getPersistentRDDs().size());
Spark.close();
}
@@ -149,15 +142,12 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
public void shouldPersistRDDAcrossJobs() throws Exception {
final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
+ final Configuration configuration = super.getBaseConfiguration();
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
Graph graph = GraphFactory.open(configuration);
graph.compute(SparkGraphComputer.class)
@@ -167,38 +157,31 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
.traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
"gremlin-groovy",
"g.V().count()").create(graph)).submit().get();
- ////////
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+ assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("reducing"))));
+ assertEquals(2, Spark.getContext().getPersistentRDDs().size());
///////
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2);
graph = GraphFactory.open(configuration);
- graph.compute(SparkGraphComputer.class)
- .result(GraphComputer.ResultGraph.NEW)
- .persist(GraphComputer.Persist.NOTHING)
- .program(TraversalVertexProgram.build()
- .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
- "gremlin-groovy",
- "g.V()").create(graph)).submit().get();
+ assertEquals(6, graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().out().count().next().longValue());
+ assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+ assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("reducing"))));
+ assertEquals(2, Spark.getContext().getPersistentRDDs().size());
Spark.close();
}
@Test
public void testBulkLoaderVertexProgramChain() throws Exception {
Spark.create("local[4]");
-
final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
- final Configuration readConfiguration = new BaseConfiguration();
- readConfiguration.setProperty("spark.master", "local[4]");
- readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
- readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ final Configuration readConfiguration = super.getBaseConfiguration();
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
- readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
Graph pageRankGraph = GraphFactory.open(readConfiguration);
///////////////
@@ -237,15 +220,11 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
Spark.create("local[4]");
final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
- final Configuration readConfiguration = new BaseConfiguration();
- readConfiguration.setProperty("spark.master", "local[4]");
- readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
- readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ final Configuration readConfiguration = super.getBaseConfiguration();
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
- readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
Graph pageRankGraph = GraphFactory.open(readConfiguration);
///////////////
@@ -282,18 +261,15 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, "testComplexChain", "graphRDD");
final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, "testComplexChain", "graphRDD2");
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ final Configuration configuration = super.getBaseConfiguration();
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+ assertEquals(0, Spark.getContext().getPersistentRDDs().size());
Graph graph = GraphFactory.open(configuration);
graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
GraphTraversalSource g = graph.traversal();
@@ -303,6 +279,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
////
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+ assertEquals(1, Spark.getContext().getPersistentRDDs().size());
////
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
@@ -317,7 +294,10 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
////
+ assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
+ assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY)));
+ assertEquals(3, Spark.getContext().getPersistentRDDs().size());
////
graph = GraphFactory.open(configuration);
graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
@@ -327,7 +307,10 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
////
+ assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
+ assertFalse(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY)));
+ assertEquals(2, Spark.getContext().getPersistentRDDs().size());
////
graph = GraphFactory.open(configuration);
graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
@@ -338,7 +321,10 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
assertEquals(0l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
assertEquals(0l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
////
+ assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
+ assertFalse(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY)));
+ assertEquals(1, Spark.getContext().getPersistentRDDs().size());
Spark.close();
}
}