You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/08 19:05:16 UTC
incubator-tinkerpop git commit: Allow the user to specify the
persistence StorageLevel for both the computed job graph and any
PersistedOutputRDD data. Updated docs, example conf,
and added a test case that validates that persisted to SparkStorage is cor
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1072 [created] 4082a4a04
Allow the user to specify the persistence StorageLevel for both the computed job graph and any PersistedOutputRDD data. Updated docs, example conf, and added a test case that validates that persisted to SparkStorage is correct as the configuration changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4082a4a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4082a4a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4082a4a0
Branch: refs/heads/TINKERPOP-1072
Commit: 4082a4a043b54c102f49f220b14e2644817e1222
Parents: 114609d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Jan 8 11:05:08 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 8 11:05:08 2016 -0700
----------------------------------------------------------------------
docs/src/reference/implementations.asciidoc | 2 +
hadoop-gremlin/conf/hadoop-gryo.properties | 2 +
.../tinkerpop/gremlin/hadoop/Constants.java | 2 +
.../hadoop/structure/HadoopConfiguration.java | 10 ++---
.../process/computer/SparkGraphComputer.java | 4 +-
.../gremlin/spark/structure/Spark.java | 3 +-
.../spark/structure/io/PersistedOutputRDD.java | 10 +++--
.../io/PersistedInputOutputRDDTest.java | 40 ++++++++++++++++++++
8 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/docs/src/reference/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/implementations.asciidoc b/docs/src/reference/implementations.asciidoc
index add8555..661c951 100644
--- a/docs/src/reference/implementations.asciidoc
+++ b/docs/src/reference/implementations.asciidoc
@@ -1200,7 +1200,9 @@ image::spark-algorithm.png[width=775]
|Property |Description
|gremlin.spark.graphInputRDD |A class for creating RDD's from underlying graph data, defaults to Hadoop `InputFormat`.
|gremlin.spark.graphOutputRDD |A class for output RDD's, defaults to Hadoop `OutputFormat`.
+|gremlin.spark.graphStorageLevel |What `StorageLevel` to use for the cached graph during job execution (default `MEMORY_ONLY`).
|gremlin.spark.persistContext |Whether to create a new `SparkContext` for every `SparkGraphComputer` or to reuse an existing one.
+|gremlin.spark.persistStorageLevel |What `StorageLevel` to use when persisted RDDs via `PersistedOutputRDD` (default `MEMORY_ONLY`).
|========================================================
If the provider/user wishes to not use Hadoop `InputFormats`, it is possible to leverage Spark's RDD
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/hadoop-gremlin/conf/hadoop-gryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-gryo.properties b/hadoop-gremlin/conf/hadoop-gryo.properties
index 9bbd41b..97188f2 100644
--- a/hadoop-gremlin/conf/hadoop-gryo.properties
+++ b/hadoop-gremlin/conf/hadoop-gryo.properties
@@ -28,8 +28,10 @@ gremlin.hadoop.outputLocation=output
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
+# gremlin.spark.graphStorageLevel=MEMORY_AND_DISK
# gremlin.spark.persistContext=true
# gremlin.spark.graphOutputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD
+# gremlin.soark.persistStorageLevel=DISK_ONLY
# spark.kryo.registrationRequired=true
# spark.storage.memoryFraction=0.2
# spark.eventLog.enabled=true
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 4a91106..aa0bca5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -55,6 +55,8 @@ public final class Constants {
public static final String GREMLIN_SPARK_GRAPH_INPUT_RDD = "gremlin.spark.graphInputRDD";
public static final String GREMLIN_SPARK_GRAPH_OUTPUT_RDD = "gremlin.spark.graphOutputRDD";
public static final String GREMLIN_SPARK_PERSIST_CONTEXT = "gremlin.spark.persistContext";
+ public static final String GREMLIN_SPARK_GRAPH_STORAGE_LEVEL = "gremlin.spark.graphStorageLevel";
+ public static final String GREMLIN_SPARK_PERSIST_STORAGE_LEVEL = "gremlin.spark.persistStorageLevel";
public static String getGraphLocation(final String location) {
return location.endsWith("/") ? location + Constants.HIDDEN_G : location + "/" + Constants.HIDDEN_G;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index d4578b4..244ead9 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -45,6 +45,11 @@ public final class HadoopConfiguration extends AbstractConfiguration implements
super.setDelimiterParsingDisabled(true);
}
+ public HadoopConfiguration(final Configuration configuration) {
+ this();
+ this.copy(configuration);
+ }
+
@Override
protected void addPropertyDirect(final String key, final Object value) {
this.properties.put(key, value);
@@ -55,11 +60,6 @@ public final class HadoopConfiguration extends AbstractConfiguration implements
this.properties.remove(key);
}
- public HadoopConfiguration(final Configuration configuration) {
- this();
- this.copy(configuration);
- }
-
@Override
public boolean isEmpty() {
return this.properties.isEmpty();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index b48fac5..a87f95f 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -32,6 +32,7 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
@@ -160,7 +161,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
.readGraphRDD(apacheConfiguration, sparkContext);
if (this.workersSet && graphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
graphRDD = graphRDD.coalesce(this.workers);
- graphRDD = graphRDD.cache();
+ // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
+ graphRDD = graphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
} catch (final InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
index 0bf679b..1c8c41b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
@@ -109,8 +109,9 @@ public class Spark {
}
public static void close() {
+ NAME_TO_RDD.clear();
if (null != CONTEXT)
CONTEXT.stop();
- NAME_TO_RDD.clear();
+ CONTEXT = null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
index 27b87f5..4ae6248 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
@@ -45,13 +46,15 @@ public final class PersistedOutputRDD implements OutputRDD {
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
SparkContextStorage.open(configuration).rm(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)); // this might be bad cause it unpersists the job RDD
+ // determine which storage level to persist the RDD as with MEMORY_ONLY being the default cache()
+ final StorageLevel storageLevel = StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY"));
if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true))
graphRDD.mapValues(vertex -> {
vertex.get().dropEdges();
return vertex;
- }).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).cache();
+ }).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel);
else
- graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).cache();
+ graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel);
Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
}
@@ -63,7 +66,8 @@ public final class PersistedOutputRDD implements OutputRDD {
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
final String memoryRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey);
Spark.removeRDD(memoryRDDName);
- memoryRDD.setName(memoryRDDName).cache();
+ memoryRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY")));
+ Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
return IteratorUtils.map(memoryRDD.toLocalIterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4082a4a0/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index e153c4e..5076e0b 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
+import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
@@ -42,6 +43,7 @@ import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.junit.Test;
+import java.util.Arrays;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
@@ -54,6 +56,44 @@ import static org.junit.Assert.assertTrue;
public class PersistedInputOutputRDDTest extends AbstractSparkTest {
@Test
+ public void shouldPersistRDDBasedOnStorageLevel() throws Exception {
+ Spark.create("local[4]");
+ int counter = 0;
+ for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY","MEMORY_ONLY_SER","MEMORY_AND_DISK_SER","OFF_HEAP")) {
+ assertEquals(counter * 2, Spark.getRDDs().size());
+ counter++;
+ final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
+ final Configuration configuration = new BaseConfiguration();
+ configuration.setProperty("spark.master", "local[4]");
+ configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+ configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel);
+ configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ Graph graph = GraphFactory.open(configuration);
+ graph.compute(SparkGraphComputer.class)
+ .result(GraphComputer.ResultGraph.NEW)
+ .persist(GraphComputer.Persist.EDGES)
+ .program(TraversalVertexProgram.build()
+ .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+ "gremlin-groovy",
+ "g.V()").create(graph)).submit().get();
+ ////////
+ assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+ assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getGraphLocation(rddName)).getStorageLevel());
+ assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))));
+ assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("traversers"))).getStorageLevel());
+ assertEquals(counter * 2, Spark.getRDDs().size());
+ //System.out.println(SparkContextStorage.open().ls());
+ }
+ Spark.close();
+ }
+
+ @Test
public void shouldNotPersistRDDAcrossJobs() throws Exception {
Spark.create("local[4]");
final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());