You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/11/05 02:56:43 UTC

[25/50] [abbrv] incubator-tinkerpop git commit: TINKERPOP-911: Lets SparkGraphComputer Set ThreadLocal Properties

TINKERPOP-911: Lets SparkGraphComputer Set ThreadLocal Properties

When running in persistent context mode, the configuration for the Spark
Context is always inherited from the origin configuration. This can be
overridden for select properties using setLocalProperty. By enabling
this for SparkGraphComputer a user will be able to take advantage of the
FairScheduler system with spark or add custom Spark JobGroupIDs to jobs in the
same JVM.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/82229d0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/82229d0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/82229d0b

Branch: refs/heads/TINKERPOP3-923
Commit: 82229d0bef3da7bb125e9b31fc5ef559a8868af0
Parents: aadd422
Author: Russell Spitzer <Ru...@gmail.com>
Authored: Mon Nov 2 15:22:45 2015 -0800
Committer: Russell Spitzer <Ru...@gmail.com>
Committed: Mon Nov 2 18:55:36 2015 -0800

----------------------------------------------------------------------
 .../process/computer/SparkGraphComputer.java    |  32 ++++++
 .../process/computer/LocalPropertyTest.java     | 102 +++++++++++++++++++
 2 files changed, 134 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/82229d0b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index b25073b..6425aab 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -135,6 +135,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             JavaSparkContext sparkContext = null;
             try {
                 sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+                updateLocalConfiguration(sparkContext, sparkConfiguration);
                 // add the project jars to the cluster
                 this.loadJars(sparkContext, hadoopConfiguration);
                 // create a message-passing friendly rdd from the input rdd
@@ -246,6 +247,37 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
         }
     }
 
+    /**
+     * When using a persistent context the running Context's configuration will override a passed
+     * in configuration. Spark allows us to override these inherited properties via
+     * SparkContext.setLocalProperty
+     */
+    private void updateLocalConfiguration(final JavaSparkContext sparkContext, final SparkConf sparkConfiguration) {
+        /*
+         * While we could enumerate over the entire SparkConfiguration and copy into the Thread
+         * Local properties of the Spark Context this could cause adverse effects with future
+         * versions of Spark. Since the api for setting multiple local properties at once is
+         * restricted as private, we will only set those properties we know can effect SparkGraphComputer
+         * Execution rather than applying the entire configuration.
+         */
+        final String[] validPropertyNames = {
+            "spark.job.description",
+            "spark.jobGroup.id",
+            "spark.job.interruptOnCancel",
+            "spark.scheduler.pool"
+        };
+
+        for (String propertyName: validPropertyNames){
+            if (sparkConfiguration.contains(propertyName)){
+                String propertyValue = sparkConfiguration.get(propertyName);
+                this.logger.info("Setting Thread Local SparkContext Property - "
+                        + propertyName + " : " + propertyValue);
+
+                sparkContext.setLocalProperty(propertyName, sparkConfiguration.get(propertyName));
+            }
+        }
+    }
+
     public static void main(final String[] args) throws Exception {
         final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
         new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/82229d0b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java
new file mode 100644
index 0000000..57c3526
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaSparkStatusTracker;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+
+public class LocalPropertyTest
+{
+
+
+    @Test
+    public void shouldSetThreadLocalProperties() throws Exception
+    {
+        final String testName = "ThreadLocalProperties";
+        final String rddName = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty("spark.master", "local[4]");
+        configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        configuration.setProperty("spark.jobGroup.id", "22");
+        Graph graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.EDGES)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ////////
+        SparkConf sparkConfiguration = new SparkConf();
+        sparkConfiguration.setAppName(testName);
+        ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+        JavaSparkStatusTracker statusTracker = sparkContext.statusTracker();
+        assertTrue(statusTracker.getJobIdsForGroup("22").length >= 1);
+        assertTrue(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+        ///////
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+        configuration.setProperty("spark.jobGroup.id", "44");
+        graph = GraphFactory.open(configuration);
+        graph.compute(SparkGraphComputer.class)
+                .result(GraphComputer.ResultGraph.NEW)
+                .persist(GraphComputer.Persist.NOTHING)
+                .program(TraversalVertexProgram.build()
+                        .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+                                "gremlin-groovy",
+                                "g.V()").create(graph)).submit().get();
+        ///////
+        assertTrue(statusTracker.getJobIdsForGroup("44").length >= 1);
+    }
+}
+