You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by fl...@apache.org on 2018/05/23 16:34:15 UTC

[11/23] tinkerpop git commit: TINKERPOP-1113 Added spark configuration options as concrete methods CTR

TINKERPOP-1113 Added spark configuration options as concrete methods CTR


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/bd85e5fe
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/bd85e5fe
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/bd85e5fe

Branch: refs/heads/TINKERPOP-1897
Commit: bd85e5febee56434c4de4e7ab31e3444437a9f5e
Parents: f36eb4f
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue May 22 06:55:46 2018 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue May 22 06:55:46 2018 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../process/computer/SparkGraphComputer.java    | 90 +++++++++++++++++---
 .../computer/SparkHadoopGraphProvider.java      |  5 +-
 3 files changed, 80 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bd85e5fe/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 21fde2c..395bb55 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Removed recursive handling of streaming results from Gremlin-Python driver to avoid max recursion depth errors.
 * Improved performance of `TraversalVertexProgram` and related infrastructure.
 * Fixed bug in `GroovyTranslator` that didn't properly handle empty `Map` objects.
+* Added concrete configuration methods to `SparkGraphComputer` to make a more clear API for configuring it.
 
 [[release-3-2-9]]
 === TinkerPop 3.2.9 (Release Date: May 8, 2018)

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bd85e5fe/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 00a2e46..4c896cd 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -30,11 +30,10 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.serializer.Serializer;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
@@ -79,7 +78,16 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_CONTEXT;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_PARTITIONER;
+import static org.apache.tinkerpop.gremlin.hadoop.Constants.SPARK_SERIALIZER;
+
 /**
+ * {@link GraphComputer} implementation for Apache Spark.
+ *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
@@ -107,8 +115,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
         ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
     }
 
+    /**
+     * Sets the number of workers. If the {@code spark.master} configuration is configured with "local" then it will
+     * change that configuration to use the specified number of worker threads.
+     */
     @Override
-    public GraphComputer workers(final int workers) {
+    public SparkGraphComputer workers(final int workers) {
         super.workers(workers);
         if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) && this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
             this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
@@ -118,11 +130,61 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
     }
 
     @Override
-    public GraphComputer configure(final String key, final Object value) {
+    public SparkGraphComputer configure(final String key, final Object value) {
         this.sparkConfiguration.setProperty(key, value);
         return this;
     }
 
+    /**
+     * Sets the configuration option for {@code spark.master} which is the cluster manager to connect to which may be
+     * one of the <a href="https://spark.apache.org/docs/latest/submitting-applications.html#master-urls">allowed master URLs</a>.
+     */
+    public SparkGraphComputer master(final String clusterManager) {
+        return configure(SparkLauncher.SPARK_MASTER, clusterManager);
+    }
+
+    /**
+     * Determines if the Spark context should be left open preventing Spark from garbage collecting unreferenced RDDs.
+     */
+    public SparkGraphComputer persistContext(final boolean persist) {
+        return configure(GREMLIN_SPARK_PERSIST_CONTEXT, persist);
+    }
+
+    /**
+     * Specifies the method by which the {@link VertexProgram} created graph is persisted. By default, it is configured
+     * to use {@code StorageLevel#MEMORY_ONLY()}
+     */
+    public SparkGraphComputer graphStorageLevel(final StorageLevel storageLevel) {
+        return configure(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, storageLevel.description());
+    }
+
+    public SparkGraphComputer persistStorageLevel(final StorageLevel storageLevel) {
+        return configure(GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel.description());
+    }
+
+    /**
+     * Determines if the graph RDD should be partitioned or not. By default, this value is {@code false}.
+     */
+    public SparkGraphComputer skipPartitioner(final boolean skip) {
+        return configure(GREMLIN_SPARK_SKIP_PARTITIONER, skip);
+    }
+
+    /**
+     * Determines if the graph RDD should be cached or not. If {@code true} then
+     * {@link #graphStorageLevel(StorageLevel)} is ignored. By default, this value is {@code false}.
+     */
+    public SparkGraphComputer skipGraphCache(final boolean skip) {
+        return configure(GREMLIN_SPARK_SKIP_GRAPH_CACHE, skip);
+    }
+
+    /**
+     * Specifies the {@code org.apache.spark.serializer.Serializer} implementation to use. By default, this value is
+     * set to {@link GryoSerializer}.
+     */
+    public SparkGraphComputer serializer(final Class<? extends Serializer> serializer) {
+        return configure(SPARK_SERIALIZER, serializer.getCanonicalName());
+    }
+
     @Override
     public Future<ComputerResult> submit() {
         this.validateStatePriorToExecution();
@@ -135,8 +197,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             final long startTime = System.currentTimeMillis();
             // apache and hadoop configurations that are used throughout the graph computer computation
             final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
-            if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
-                graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+            if (!graphComputerConfiguration.containsKey(SPARK_SERIALIZER))
+                graphComputerConfiguration.setProperty(SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
             graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
             final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
             final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
@@ -144,8 +206,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
             final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
             final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
-            final boolean skipPartitioner = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false);
-            final boolean skipPersist = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
+            final boolean skipPartitioner = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_PARTITIONER, false);
+            final boolean skipPersist = graphComputerConfiguration.getBoolean(GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
             if (inputFromHDFS) {
                 String inputLocation = Constants
                         .getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
@@ -230,7 +292,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         assert loadedGraphRDD.partitioner().isPresent();
                     } else {
                         assert skipPartitioner == !loadedGraphRDD.partitioner().isPresent(); // no easy way to test this with a test case
-                        this.logger.debug("Partitioning has been skipped for the loaded graphRDD via " + Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
+                        this.logger.debug("Partitioning has been skipped for the loaded graphRDD via " + GREMLIN_SPARK_SKIP_PARTITIONER);
                     }
                 }
                 // if the loaded graphRDD was already partitioned previous, then this coalesce/repartition will not take place
@@ -242,7 +304,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 }
                 // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
                 if (!skipPersist && (!inputFromSpark || partitioned || filtered))
-                    loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+                    loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
 
                 // final graph with view (for persisting and/or mapReducing -- may be null and thus, possible to save space/time)
                 JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
@@ -323,7 +385,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         });
                         // if there is only one MapReduce to execute, don't bother wasting the clock cycles.
                         if (this.mapReducers.size() > 1)
-                            mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+                            mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
                     }
 
                     for (final MapReduce mapReduce : this.mapReducers) {
@@ -370,11 +432,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // clear properties that should not be propagated in an OLAP chain
                 graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
                 graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
-                graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE);
-                graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
+                graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_GRAPH_CACHE);
+                graphComputerConfiguration.clearProperty(GREMLIN_SPARK_SKIP_PARTITIONER);
                 return new DefaultComputerResult(InputOutputHelper.getOutputGraph(graphComputerConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
             } finally {
-                if (!graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+                if (!graphComputerConfiguration.getBoolean(GREMLIN_SPARK_PERSIST_CONTEXT, false))
                     Spark.close();
             }
         });

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bd85e5fe/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index d4201b5..469c4b1 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.spark.process.computer;
 
+import org.apache.spark.launcher.SparkLauncher;
 import org.apache.tinkerpop.gremlin.GraphProvider;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.groovy.util.SugarTestHelper;
@@ -82,8 +83,8 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider {
         }
 
         config.put(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER, SparkGraphComputer.class.getCanonicalName());
-        config.put("spark.master", "local[4]");
-        config.put("spark.serializer", GryoSerializer.class.getCanonicalName());
+        config.put(SparkLauncher.SPARK_MASTER, "local[4]");
+        config.put(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
         config.put("spark.kryo.registrationRequired", true);
         return config;
     }