You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by fl...@apache.org on 2018/05/23 16:34:15 UTC
[11/23] tinkerpop git commit: TINKERPOP-1113 Added spark
configuration options as concrete methods CTR
TINKERPOP-1113 Added spark configuration options as concrete methods CTR
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/bd85e5fe
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/bd85e5fe
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/bd85e5fe
Branch: refs/heads/TINKERPOP-1897
Commit: bd85e5febee56434c4de4e7ab31e3444437a9f5e
Parents: f36eb4f
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue May 22 06:55:46 2018 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue May 22 06:55:46 2018 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
.../process/computer/SparkGraphComputer.java | 90 +++++++++++++++++---
.../computer/SparkHadoopGraphProvider.java | 5 +-
3 files changed, 80 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bd85e5fe/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 21fde2c..395bb55 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Removed recursive handling of streaming results from Gremlin-Python driver to avoid max recursion depth errors.
* Improved performance of `TraversalVertexProgram` and related infrastructure.
* Fixed bug in `GroovyTranslator` that didn't properly handle empty `Map` objects.
+* Added concrete configuration methods to `SparkGraphComputer` to make a more clear API for configuring it.
[[release-3-2-9]]
=== TinkerPop 3.2.9 (Release Date: May 8, 2018)
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bd85e5fe/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 00a2e46..4c896cd 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -30,11 +30,10 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.serializer.Serializer;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
@@ -79,7 +78,16 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_CONTEXT;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_PARTITIONER;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.SPARK_SERIALIZER;
+
/**
+ * {@link GraphComputer} implementation for Apache Spark.
+ *
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
@@ -107,8 +115,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
}
+ /**
+ * Sets the number of workers. If the {@code spark.master} configuration is configured with "local" then it will
+ * change that configuration to use the specified number of worker threads.
+ */
@Override
- public GraphComputer workers(final int workers) {
+ public SparkGraphComputer workers(final int workers) {
super.workers(workers);
if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) && this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
@@ -118,11 +130,61 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
@Override
- public GraphComputer configure(final String key, final Object value) {
+ public SparkGraphComputer configure(final String key, final Object value) {
this.sparkConfiguration.setProperty(key, value);
return this;
}
+ /**
+ * Sets the configuration option for {@code spark.master} which is the cluster manager to connect to which may be
+ * one of the <a href="https://spark.apache.org/docs/latest/submitting-applications.html#master-urls">allowed master URLs</a>.
+ */
+ public SparkGraphComputer master(final String clusterManager) {
+ return configure(SparkLauncher.SPARK_MASTER, clusterManager);
+ }
+
+ /**
+ * Determines if the Spark context should be left open preventing Spark from garbage collecting unreferenced RDDs.
+ */
+ public SparkGraphComputer persistContext(final boolean persist) {
+ return configure(GREMLIN_SPARK_PERSIST_CONTEXT, persist);
+ }
+
+ /**
+ * Specifies the method by which the {@link VertexProgram} created graph is persisted. By default, it is configured
+ * to use {@code StorageLevel#MEMORY_ONLY()}
+ */
+ public SparkGraphComputer graphStorageLevel(final StorageLevel storageLevel) {
+ return configure(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, storageLevel.description());
+ }
+
+ public SparkGraphComputer persistStorageLevel(final StorageLevel storageLevel) {
+ return configure(GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel.description());
+ }
+
+ /**
+ * Determines if the graph RDD should be partitioned or not. By default, this value is {@code false}.
+ */
+ public SparkGraphComputer skipPartitioner(final boolean skip) {
+ return configure(GREMLIN_SPARK_SKIP_PARTITIONER, skip);
+ }
+
+ /**
+ * Determines if the graph RDD should be cached or not. If {@code true} then
+ * {@link #graphStorageLevel(StorageLevel)} is ignored. By default, this value is {@code false}.
+ */
+ public SparkGraphComputer skipGraphCache(final boolean skip) {
+ return configure(GREMLIN_SPARK_SKIP_GRAPH_CACHE, skip);
+ }
+
+ /**
+ * Specifies the {@code org.apache.spark.serializer.Serializer} implementation to use. By default, this value is
+ * set to {@link GryoSerializer}.
+ */
+ public SparkGraphComputer serializer(final Class<? extends Serializer> serializer) {
+ return configure(SPARK_SERIALIZER, serializer.getCanonicalName());
+ }
+
@Override
public Future<ComputerResult> submit() {
this.validateStatePriorToExecution();
@@ -135,8 +197,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
final long startTime = System.currentTimeMillis();
// apache and hadoop configurations that are used throughout the graph computer computation
final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
- if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
- graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+ if (!graphComputerConfiguration.containsKey(SPARK_SERIALIZER))
+ graphComputerConfiguration.setProperty(SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
@@ -144,8 +206,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
- final boolean skipPartitioner = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false);
- final boolean skipPersist = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
+ final boolean skipPartitioner = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_PARTITIONER, false);
+ final boolean skipPersist = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
if (inputFromHDFS) {
String inputLocation = Constants
.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
@@ -230,7 +292,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
assert loadedGraphRDD.partitioner().isPresent();
} else {
assert skipPartitioner == !loadedGraphRDD.partitioner().isPresent(); // no easy way to test this with a test case
- this.logger.debug("Partitioning has been skipped for the loaded graphRDD via " + Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
+ this.logger.debug("Partitioning has been skipped for the loaded graphRDD via " + GREMLIN_SPARK_SKIP_PARTITIONER);
}
}
// if the loaded graphRDD was already partitioned previous, then this coalesce/repartition will not take place
@@ -242,7 +304,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
// persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
if (!skipPersist && (!inputFromSpark || partitioned || filtered))
- loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+ loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
// final graph with view (for persisting and/or mapReducing -- may be null and thus, possible to save space/time)
JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
@@ -323,7 +385,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
});
// if there is only one MapReduce to execute, don't bother wasting the clock cycles.
if (this.mapReducers.size() > 1)
- mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+ mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
}
for (final MapReduce mapReduce : this.mapReducers) {
@@ -370,11 +432,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// clear properties that should not be propagated in an OLAP chain
graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
- graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE);
- graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
+ graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_GRAPH_CACHE);
+ graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_PARTITIONER);
return new DefaultComputerResult(InputOutputHelper.getOutputGraph(graphComputerConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
} finally {
- if (!graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+ if (!graphComputerConfiguration.getBoolean(GREMLIN_SPARK_PERSIST_CONTEXT, false))
Spark.close();
}
});
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bd85e5fe/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index d4201b5..469c4b1 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.spark.process.computer;
+import org.apache.spark.launcher.SparkLauncher;
import org.apache.tinkerpop.gremlin.GraphProvider;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.groovy.util.SugarTestHelper;
@@ -82,8 +83,8 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider {
}
config.put(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
- config.put("spark.master", "local[4]");
- config.put("spark.serializer", GryoSerializer.class.getCanonicalName());
+ config.put(SparkLauncher.SPARK_MASTER, "local[4]");
+ config.put(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
config.put("spark.kryo.registrationRequired", true);
return config;
}