You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/12 15:29:18 UTC

[1/3] incubator-tinkerpop git commit: Allow the user to specify the persistence StorageLevel for both the computed job graph and any PersistedOutputRDD data. Updated docs, example conf, and added a test case that validates that persisted to SparkStorage

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 3395f2b3c -> 039828bdc


Allow the user to specify the persistence StorageLevel for both the computed job graph and any PersistedOutputRDD data. Updated docs, example conf, and added a test case that validates that persisted to SparkStorage is correct as the configuration changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4082a4a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4082a4a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4082a4a0

Branch: refs/heads/master
Commit: 4082a4a043b54c102f49f220b14e2644817e1222
Parents: 114609d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Jan 8 11:05:08 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 8 11:05:08 2016 -0700

----------------------------------------------------------------------
 docs/src/reference/implementations.asciidoc     |  2 +
 hadoop-gremlin/conf/hadoop-gryo.properties      |  2 +
 .../tinkerpop/gremlin/hadoop/Constants.java     |  2 +
 .../hadoop/structure/HadoopConfiguration.java   | 10 ++---
 .../process/computer/SparkGraphComputer.java    |  4 +-
 .../gremlin/spark/structure/Spark.java          |  3 +-
 .../spark/structure/io/PersistedOutputRDD.java  | 10 +++--
 .../io/PersistedInputOutputRDDTest.java         | 40 ++++++++++++++++++++
 8 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/docs/src/reference/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/implementations.asciidoc b/docs/src/reference/implementations.asciidoc
index add8555..661c951 100644
--- a/docs/src/reference/implementations.asciidoc
+++ b/docs/src/reference/implementations.asciidoc
@@ -1200,7 +1200,9 @@ image::spark-algorithm.png[width=775]
 |Property |Description
 |gremlin.spark.graphInputRDD |A class for creating RDD's from underlying graph data, defaults to Hadoop `InputFormat`.
 |gremlin.spark.graphOutputRDD |A class for output RDD's, defaults to Hadoop `OutputFormat`.
+|gremlin.spark.graphStorageLevel |What `StorageLevel` to use for the cached graph during job execution (default `MEMORY_ONLY`).
 |gremlin.spark.persistContext |Whether to create a new `SparkContext` for every `SparkGraphComputer` or to reuse an existing one.
+|gremlin.spark.persistStorageLevel |What `StorageLevel` to use when persisted RDDs via `PersistedOutputRDD` (default `MEMORY_ONLY`).
 |========================================================
 
 If the provider/user wishes to not use Hadoop `InputFormats`, it is possible to leverage Spark's RDD

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/hadoop-gremlin/conf/hadoop-gryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-gryo.properties b/hadoop-gremlin/conf/hadoop-gryo.properties
index 9bbd41b..97188f2 100644
--- a/hadoop-gremlin/conf/hadoop-gryo.properties
+++ b/hadoop-gremlin/conf/hadoop-gryo.properties
@@ -28,8 +28,10 @@ gremlin.hadoop.outputLocation=output
 spark.master=local[4]
 spark.executor.memory=1g
 spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
+# gremlin.spark.graphStorageLevel=MEMORY_AND_DISK
 # gremlin.spark.persistContext=true
 # gremlin.spark.graphOutputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD
+# gremlin.soark.persistStorageLevel=DISK_ONLY
 # spark.kryo.registrationRequired=true
 # spark.storage.memoryFraction=0.2
 # spark.eventLog.enabled=true

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 4a91106..aa0bca5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -55,6 +55,8 @@ public final class Constants {
     public static final String GREMLIN_SPARK_GRAPH_INPUT_RDD = "gremlin.spark.graphInputRDD";
     public static final String GREMLIN_SPARK_GRAPH_OUTPUT_RDD = "gremlin.spark.graphOutputRDD";
     public static final String GREMLIN_SPARK_PERSIST_CONTEXT = "gremlin.spark.persistContext";
+    public static final String GREMLIN_SPARK_GRAPH_STORAGE_LEVEL = "gremlin.spark.graphStorageLevel";
+    public static final String GREMLIN_SPARK_PERSIST_STORAGE_LEVEL = "gremlin.spark.persistStorageLevel";
 
     public static String getGraphLocation(final String location) {
         return location.endsWith("/") ? location + Constants.HIDDEN_G : location + "/" + Constants.HIDDEN_G;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index d4578b4..244ead9 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -45,6 +45,11 @@ public final class HadoopConfiguration extends AbstractConfiguration implements
         super.setDelimiterParsingDisabled(true);
     }
 
+    public HadoopConfiguration(final Configuration configuration) {
+        this();
+        this.copy(configuration);
+    }
+
     @Override
     protected void addPropertyDirect(final String key, final Object value) {
         this.properties.put(key, value);
@@ -55,11 +60,6 @@ public final class HadoopConfiguration extends AbstractConfiguration implements
         this.properties.remove(key);
     }
 
-    public HadoopConfiguration(final Configuration configuration) {
-        this();
-        this.copy(configuration);
-    }
-
     @Override
     public boolean isEmpty() {
         return this.properties.isEmpty();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index b48fac5..a87f95f 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -32,6 +32,7 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.storage.StorageLevel;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
@@ -160,7 +161,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                             .readGraphRDD(apacheConfiguration, sparkContext);
                     if (this.workersSet && graphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
                         graphRDD = graphRDD.coalesce(this.workers);
-                    graphRDD = graphRDD.cache();
+                    // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
+                    graphRDD = graphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
                 } catch (final InstantiationException | IllegalAccessException e) {
                     throw new IllegalStateException(e.getMessage(), e);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
index 0bf679b..1c8c41b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
@@ -109,8 +109,9 @@ public class Spark {
     }
 
     public static void close() {
+        NAME_TO_RDD.clear();
         if (null != CONTEXT)
             CONTEXT.stop();
-        NAME_TO_RDD.clear();
+        CONTEXT = null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
index 27b87f5..4ae6248 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.storage.StorageLevel;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
@@ -45,13 +46,15 @@ public final class PersistedOutputRDD implements OutputRDD {
         if (!configuration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
             throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
         SparkContextStorage.open(configuration).rm(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));  // this might be bad cause it unpersists the job RDD
+        // determine which storage level to persist the RDD as with MEMORY_ONLY being the default cache()
+        final StorageLevel storageLevel = StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY"));
         if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true))
             graphRDD.mapValues(vertex -> {
                 vertex.get().dropEdges();
                 return vertex;
-            }).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).cache();
+            }).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel);
         else
-            graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).cache();
+            graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel);
         Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
     }
 
@@ -63,7 +66,8 @@ public final class PersistedOutputRDD implements OutputRDD {
             throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
         final String memoryRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey);
         Spark.removeRDD(memoryRDDName);
-        memoryRDD.setName(memoryRDDName).cache();
+        memoryRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY")));
+        Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
         return IteratorUtils.map(memoryRDD.toLocalIterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index e153c4e..5076e0b 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
 
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
+import org.apache.spark.storage.StorageLevel;
 import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -42,6 +43,7 @@ import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
@@ -54,6 +56,44 @@ import static org.junit.Assert.assertTrue;
 public class PersistedInputOutputRDDTest extends AbstractSparkTest {
 
     @Test
+    public void shouldPersistRDDBasedOnStorageLevel() throws Exception {
+        Spark.create("local[4]");
+        int counter = 0;
+        for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY","MEMORY_ONLY_SER","MEMORY_AND_DISK_SER","OFF_HEAP")) {
+            assertEquals(counter * 2, Spark.getRDDs().size());
+            counter++;
+            final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
+            final Configuration configuration = new BaseConfiguration();
+            configuration.setProperty("spark.master", "local[4]");
+            configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+            configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+            configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+            configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+            configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+            configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel);
+            configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+            configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+            configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+            Graph graph = GraphFactory.open(configuration);
+            graph.compute(SparkGraphComputer.class)
+                    .result(GraphComputer.ResultGraph.NEW)
+                    .persist(GraphComputer.Persist.EDGES)
+                    .program(TraversalVertexProgram.build()
+                            .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                    "gremlin-groovy",
+                                    "g.V()").create(graph)).submit().get();
+            ////////
+            assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+            assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getGraphLocation(rddName)).getStorageLevel());
+            assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))));
+            assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))).getStorageLevel());
+            assertEquals(counter * 2, Spark.getRDDs().size());
+            //System.out.println(SparkContextStorage.open().ls());
+        }
+        Spark.close();
+    }
+
+    @Test
     public void shouldNotPersistRDDAcrossJobs() throws Exception {
         Spark.create("local[4]");
         final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());


[3/3] incubator-tinkerpop git commit: Updated CHANGELOG.

Posted by ok...@apache.org.
Updated CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/039828bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/039828bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/039828bd

Branch: refs/heads/master
Commit: 039828bdc77d7557b3ac69acb9dcecc3eff00782
Parents: 699da44
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 12 07:29:15 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 12 07:29:15 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/039828bd/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index fdbfb90..2a19faa 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.1 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* The Spark persistence `StorageLevel` can now be set for both job graphs and `PersistedOutputRDD` data.
 * Fixed a bug around duration calculations of `cap()`-step during profiling.
 * It is possible to completely avoid using HDFS with Spark if `PersistedInputRDD` and `PersistedOutpuRDD` are leveraged.
 * `InputRDD` and `OutputRDD` can now process both graphs and memory (i.e. sideEffects).


[2/3] incubator-tinkerpop git commit: Merge branch 'TINKERPOP-1072'

Posted by ok...@apache.org.
Merge branch 'TINKERPOP-1072'


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/699da44c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/699da44c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/699da44c

Branch: refs/heads/master
Commit: 699da44ce88ae163a776ae7dd6fd710847b0d6b6
Parents: 3395f2b 4082a4a
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 12 07:19:10 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 12 07:19:10 2016 -0700

----------------------------------------------------------------------
 docs/src/reference/implementations.asciidoc     |  2 +
 hadoop-gremlin/conf/hadoop-gryo.properties      |  2 +
 .../tinkerpop/gremlin/hadoop/Constants.java     |  2 +
 .../hadoop/structure/HadoopConfiguration.java   | 10 ++---
 .../process/computer/SparkGraphComputer.java    |  4 +-
 .../gremlin/spark/structure/Spark.java          |  3 +-
 .../spark/structure/io/PersistedOutputRDD.java  | 10 +++--
 .../io/PersistedInputOutputRDDTest.java         | 40 ++++++++++++++++++++
 8 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------