You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/25 22:22:13 UTC
incubator-tinkerpop git commit: Found another bug in my manual
testing of Spark. If the SparkContext is open,
the graphRDD prior to finalization is persited -- eek. Made it so that it is
unpersisted() correctly. Moreover, added test cases to ensure that
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 2ca4cba87 -> 8646af7d6
Found another bug in my manual testing of Spark. If the SparkContext is open, the graphRDD prior to finalization is persited -- eek. Made it so that it is unpersisted() correctly. Moreover, added test cases to ensure that unnamed persited RDDs are not dangling around between jobs on a persited context. Also, found a NullPointerException. Ran full spark integration tests. CTR.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8646af7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8646af7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8646af7d
Branch: refs/heads/master
Commit: 8646af7d6a3a5dafcc5c3ffc5e8ceccbd9a11735
Parents: 2ca4cba
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Jan 25 14:22:09 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jan 25 14:22:09 2016 -0700
----------------------------------------------------------------------
.../spark/process/computer/SparkExecutor.java | 7 +++--
.../process/computer/SparkGraphComputer.java | 2 +-
.../spark/structure/io/SparkContextStorage.java | 2 +-
.../io/PersistedInputOutputRDDTest.java | 27 +++++++++++++++++++-
4 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8646af7d/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 087db62..d3f31cb 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -133,9 +133,9 @@ public final class SparkExecutor {
return newViewIncomingRDD;
}
- public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
+ public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys, final boolean unpersistInput) {
// attach the final computed view to the cached graph
- return graphRDD.leftOuterJoin(viewIncomingRDD)
+ final JavaPairRDD<Object, VertexWritable> finalGraphRDD = graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
vertex.dropVertexProperties(elementComputeKeys);
@@ -144,6 +144,9 @@ public final class SparkExecutor {
view.clear(); // no longer needed so kill it from memory
return tuple._1();
});
+ if (unpersistInput)
+ graphRDD.unpersist();
+ return finalGraphRDD;
}
/////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8646af7d/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index e682c72..30f4dba 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -207,7 +207,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
// write the graph rdd using the output rdd
final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
- graphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys);
+ graphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys, !inputFromSpark);
if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) &&
!this.persist.equals(GraphComputer.Persist.NOTHING)) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8646af7d/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
index 299d973..76a3299 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -160,6 +160,6 @@ public final class SparkContextStorage implements Storage {
@Override
public String toString() {
- return StringFactory.storageString(Spark.getContext().master());
+ return StringFactory.storageString(null == Spark.getContext() ? "spark:none" : Spark.getContext().master());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8646af7d/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 895df01..6dac97f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
@@ -56,11 +57,32 @@ import static org.junit.Assert.assertTrue;
public class PersistedInputOutputRDDTest extends AbstractSparkTest {
@Test
+ public void shouldNotHaveDanglingPersistedComputeRDDs() throws Exception {
+ Spark.create("local[4]");
+ final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
+ final Configuration configuration = super.getBaseConfiguration();
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ Graph graph = GraphFactory.open(configuration);
+ assertEquals(6, graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().out().count().next().longValue());
+ ////////
+ assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+ assertEquals(0, Spark.getContext().getPersistentRDDs().size());
+ ///////
+ Spark.close();
+ }
+
+
+ @Test
public void shouldPersistRDDBasedOnStorageLevel() throws Exception {
Spark.create("local[4]");
int counter = 0;
- for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY","MEMORY_ONLY_SER","MEMORY_AND_DISK_SER","OFF_HEAP")) {
+ for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY", "MEMORY_ONLY_SER", "MEMORY_AND_DISK_SER", "OFF_HEAP")) {
assertEquals(counter * 2, Spark.getRDDs().size());
+ assertEquals(counter * 2, Spark.getContext().getPersistentRDDs().size());
counter++;
final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
final Configuration configuration = new BaseConfiguration();
@@ -88,6 +110,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))));
assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))).getStorageLevel());
assertEquals(counter * 2, Spark.getRDDs().size());
+ assertEquals(counter * 2, Spark.getContext().getPersistentRDDs().size());
//System.out.println(SparkContextStorage.open().ls());
}
Spark.close();
@@ -194,6 +217,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
.submit().get();
////
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+ assertEquals(1, Spark.getContext().getPersistentRDDs().size());
////
final Graph graph = TinkerGraph.open();
final GraphTraversalSource g = graph.traversal();
@@ -237,6 +261,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
////
Spark.create(readConfiguration);
assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+ assertEquals(1, Spark.getContext().getPersistentRDDs().size());
////
final Graph graph = TinkerGraph.open();
final GraphTraversalSource g = graph.traversal();