You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2018/06/01 12:05:38 UTC
[09/50] tinkerpop git commit: Merge branch 'tp32' into tp33
Merge branch 'tp32' into tp33
Conflicts:
spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3891777e
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3891777e
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3891777e
Branch: refs/heads/TINKERPOP-1967
Commit: 3891777e4b30665bd47a5ead9e50871f37f7e9d8
Parents: a708cc3 bd85e5f
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue May 22 07:08:22 2018 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue May 22 07:08:22 2018 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
.../process/computer/SparkGraphComputer.java | 104 ++++++++++++++++---
2 files changed, 93 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3891777e/CHANGELOG.asciidoc
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3891777e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --cc spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index dafe613,4c896cd..5184db6
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@@ -33,9 -33,7 +33,9 @@@ import org.apache.spark.Partitioner
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
++import org.apache.spark.serializer.KryoRegistrator;
+import org.apache.spark.serializer.KryoSerializer;
+ import org.apache.spark.serializer.Serializer;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
@@@ -87,7 -78,16 +87,17 @@@ import java.util.concurrent.Executors
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+ import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL;
+ import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_CONTEXT;
+ import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL;
+ import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE;
+ import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_PARTITIONER;
++import static org.apache.tinkerpop.gremlin.hadoop.Constants.SPARK_KRYO_REGISTRATION_REQUIRED;
+ import static org.apache.tinkerpop.gremlin.hadoop.Constants.SPARK_SERIALIZER;
+
/**
+ * {@link GraphComputer} implementation for Apache Spark.
+ *
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
@@@ -116,10 -112,15 +126,14 @@@
public SparkGraphComputer(final HadoopGraph hadoopGraph) {
super(hadoopGraph);
this.sparkConfiguration = new HadoopConfiguration();
- ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration);
}
+ /**
+ * Sets the number of workers. If the {@code spark.master} configuration is configured with "local" then it will
+ * change that configuration to use the specified number of worker threads.
+ */
@Override
- public GraphComputer workers(final int workers) {
+ public SparkGraphComputer workers(final int workers) {
super.workers(workers);
if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) && this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) {
this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]");
@@@ -134,6 -135,56 +148,72 @@@
return this;
}
+ /**
+ * Sets the configuration option for {@code spark.master} which is the cluster manager to connect to which may be
+ * one of the <a href="https://spark.apache.org/docs/latest/submitting-applications.html#master-urls">allowed master URLs</a>.
+ */
+ public SparkGraphComputer master(final String clusterManager) {
+ return configure(SparkLauncher.SPARK_MASTER, clusterManager);
+ }
+
+ /**
+ * Determines if the Spark context should be left open preventing Spark from garbage collecting unreferenced RDDs.
+ */
+ public SparkGraphComputer persistContext(final boolean persist) {
+ return configure(GREMLIN_SPARK_PERSIST_CONTEXT, persist);
+ }
+
+ /**
+ * Specifies the method by which the {@link VertexProgram} created graph is persisted. By default, it is configured
+ * to use {@code StorageLevel#MEMORY_ONLY()}
+ */
+ public SparkGraphComputer graphStorageLevel(final StorageLevel storageLevel) {
+ return configure(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, storageLevel.description());
+ }
+
+ public SparkGraphComputer persistStorageLevel(final StorageLevel storageLevel) {
+ return configure(GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel.description());
+ }
+
+ /**
+ * Determines if the graph RDD should be partitioned or not. By default, this value is {@code false}.
+ */
+ public SparkGraphComputer skipPartitioner(final boolean skip) {
+ return configure(GREMLIN_SPARK_SKIP_PARTITIONER, skip);
+ }
+
+ /**
+ * Determines if the graph RDD should be cached or not. If {@code true} then
+ * {@link #graphStorageLevel(StorageLevel)} is ignored. By default, this value is {@code false}.
+ */
+ public SparkGraphComputer skipGraphCache(final boolean skip) {
+ return configure(GREMLIN_SPARK_SKIP_GRAPH_CACHE, skip);
+ }
+
+ /**
+ * Specifies the {@code org.apache.spark.serializer.Serializer} implementation to use. By default, this value is
- * set to {@link GryoSerializer}.
++ * set to {@code org.apache.spark.serializer.KryoSerializer}.
+ */
+ public SparkGraphComputer serializer(final Class<? extends Serializer> serializer) {
+ return configure(SPARK_SERIALIZER, serializer.getCanonicalName());
+ }
+
++ /**
++ * Specifies the {@code org.apache.spark.serializer.KryoRegistrator} to use to install additional types. By
++ * default this value is set to TinkerPop's {@link GryoRegistrator}.
++ */
++ public SparkGraphComputer sparkKryoRegistrator(final Class<? extends KryoRegistrator> registrator) {
++ return configure(Constants.SPARK_KRYO_REGISTRATOR, registrator.getCanonicalName());
++ }
++
++ /**
++ * Determines if kryo registration is required such that attempts to serialize classes that are not registered
++ * will result in an error. By default this value is {@code false}.
++ */
++ public SparkGraphComputer kryoRegistrationRequired(final boolean required) {
++ return configure(SPARK_KRYO_REGISTRATION_REQUIRED, required);
++ }
++
@Override
public Future<ComputerResult> submit() {
this.validateStatePriorToExecution();