You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/25 22:22:13 UTC

incubator-tinkerpop git commit: Found another bug in my manual testing of Spark. If the SparkContext is open, the graphRDD prior to finalization is persited -- eek. Made it so that it is unpersisted() correctly. Moreover, added test cases to ensure that

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 2ca4cba87 -> 8646af7d6


Found another bug in my manual testing of Spark. If the SparkContext is open, the graphRDD prior to finalization is persited -- eek. Made it so that it is unpersisted() correctly. Moreover, added test cases to ensure that unnamed persited RDDs are not dangling around between jobs on a persited context. Also, found a NullPointerException. Ran full spark integration tests. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8646af7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8646af7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8646af7d

Branch: refs/heads/master
Commit: 8646af7d6a3a5dafcc5c3ffc5e8ceccbd9a11735
Parents: 2ca4cba
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Jan 25 14:22:09 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jan 25 14:22:09 2016 -0700

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   |  7 +++--
 .../process/computer/SparkGraphComputer.java    |  2 +-
 .../spark/structure/io/SparkContextStorage.java |  2 +-
 .../io/PersistedInputOutputRDDTest.java         | 27 +++++++++++++++++++-
 4 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8646af7d/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 087db62..d3f31cb 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -133,9 +133,9 @@ public final class SparkExecutor {
         return newViewIncomingRDD;
     }
 
-    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
+    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys, final boolean unpersistInput) {
         // attach the final computed view to the cached graph
-        return graphRDD.leftOuterJoin(viewIncomingRDD)
+        final JavaPairRDD<Object, VertexWritable> finalGraphRDD = graphRDD.leftOuterJoin(viewIncomingRDD)
                 .mapValues(tuple -> {
                     final StarGraph.StarVertex vertex = tuple._1().get();
                     vertex.dropVertexProperties(elementComputeKeys);
@@ -144,6 +144,9 @@ public final class SparkExecutor {
                     view.clear(); // no longer needed so kill it from memory
                     return tuple._1();
                 });
+        if (unpersistInput)
+            graphRDD.unpersist();
+        return finalGraphRDD;
     }
 
     /////////////////

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8646af7d/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index e682c72..30f4dba 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -207,7 +207,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     }
                     // write the graph rdd using the output rdd
                     final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
-                    graphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys);
+                    graphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys, !inputFromSpark);
                     if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
                             hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) &&
                             !this.persist.equals(GraphComputer.Persist.NOTHING)) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8646af7d/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
index 299d973..76a3299 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -160,6 +160,6 @@ public final class SparkContextStorage implements Storage {
 
     @Override
     public String toString() {
-        return StringFactory.storageString(Spark.getContext().master());
+        return StringFactory.storageString(null == Spark.getContext() ? "spark:none" : Spark.getContext().master());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8646af7d/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 895df01..6dac97f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
@@ -56,11 +57,32 @@ import static org.junit.Assert.assertTrue;
 public class PersistedInputOutputRDDTest extends AbstractSparkTest {
 
     @Test
+    public void shouldNotHaveDanglingPersistedComputeRDDs() throws Exception {
+        Spark.create("local[4]");
+        final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
+        final Configuration configuration = super.getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph graph = GraphFactory.open(configuration);
+        assertEquals(6, graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().out().count().next().longValue());
+        ////////
+        assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertEquals(0, Spark.getContext().getPersistentRDDs().size());
+        ///////
+        Spark.close();
+    }
+
+
+    @Test
     public void shouldPersistRDDBasedOnStorageLevel() throws Exception {
         Spark.create("local[4]");
         int counter = 0;
-        for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY","MEMORY_ONLY_SER","MEMORY_AND_DISK_SER","OFF_HEAP")) {
+        for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY", "MEMORY_ONLY_SER", "MEMORY_AND_DISK_SER", "OFF_HEAP")) {
             assertEquals(counter * 2, Spark.getRDDs().size());
+            assertEquals(counter * 2, Spark.getContext().getPersistentRDDs().size());
             counter++;
             final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
             final Configuration configuration = new BaseConfiguration();
@@ -88,6 +110,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
             assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))));
             assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))).getStorageLevel());
             assertEquals(counter * 2, Spark.getRDDs().size());
+            assertEquals(counter * 2, Spark.getContext().getPersistentRDDs().size());
             //System.out.println(SparkContextStorage.open().ls());
         }
         Spark.close();
@@ -194,6 +217,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
                 .submit().get();
         ////
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertEquals(1, Spark.getContext().getPersistentRDDs().size());
         ////
         final Graph graph = TinkerGraph.open();
         final GraphTraversalSource g = graph.traversal();
@@ -237,6 +261,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         ////
         Spark.create(readConfiguration);
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertEquals(1, Spark.getContext().getPersistentRDDs().size());
         ////
         final Graph graph = TinkerGraph.open();
         final GraphTraversalSource g = graph.traversal();