You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/07/31 20:51:34 UTC

git commit: SPARK-2664. Deal with `--conf` options in spark-submit that relate to fl...

Repository: spark
Updated Branches:
  refs/heads/master f19331235 -> f68105df5


SPARK-2664. Deal with `--conf` options in spark-submit that relate to fl...

...ags

Author: Sandy Ryza <sa...@cloudera.com>

Closes #1665 from sryza/sandy-spark-2664 and squashes the following commits:

0518c63 [Sandy Ryza] SPARK-2664. Deal with `--conf` options in spark-submit that relate to flags


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f68105df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f68105df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f68105df

Branch: refs/heads/master
Commit: f68105df52902a1c65207d4f51bfdeb55cccf767
Parents: f193312
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Thu Jul 31 11:51:20 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jul 31 11:51:20 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   | 11 ++++++---
 .../spark/deploy/SparkSubmitArguments.scala     | 26 +++++++++++---------
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 16 ++++++++++++
 3 files changed, 38 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f68105df/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 3df811c..318509a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -184,7 +184,7 @@ object SparkSubmit {
       OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
 
       // Yarn cluster only
-      OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
+      OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
       OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
       OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
       OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
@@ -268,14 +268,17 @@ object SparkSubmit {
       }
     }
 
+    // Properties given with --conf are superceded by other options, but take precedence over
+    // properties in the defaults file.
+    for ((k, v) <- args.sparkProperties) {
+      sysProps.getOrElseUpdate(k, v)
+    }
+
     // Read from default spark properties, if any
     for ((k, v) <- args.getDefaultSparkProperties) {
       sysProps.getOrElseUpdate(k, v)
     }
 
-    // Spark properties included on command line take precedence
-    sysProps ++= args.sparkProperties
-
     (childArgs, childClasspath, sysProps, childMainClass)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f68105df/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 01d0ae5..dd044e6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
   val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
 
   parseOpts(args.toList)
-  loadDefaults()
+  mergeSparkProperties()
   checkRequiredArguments()
 
   /** Return default present in the currently defined defaults file. */
@@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
     defaultProperties
   }
 
-  /** Fill in any undefined values based on the current properties file or built-in defaults. */
-  private def loadDefaults(): Unit = {
-
+  /**
+   * Fill in any undefined values based on the default properties file or options passed in through
+   * the '--conf' flag.
+   */
+  private def mergeSparkProperties(): Unit = {
     // Use common defaults file, if not specified by user
     if (propertiesFile == null) {
       sys.env.get("SPARK_HOME").foreach { sparkHome =>
@@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
       }
     }
 
-    val defaultProperties = getDefaultSparkProperties
+    val properties = getDefaultSparkProperties
+    properties.putAll(sparkProperties)
+
     // Use properties file as fallback for values which have a direct analog to
     // arguments in this script.
-    master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
+    master = Option(master).getOrElse(properties.get("spark.master").orNull)
     executorMemory = Option(executorMemory)
-      .getOrElse(defaultProperties.get("spark.executor.memory").orNull)
+      .getOrElse(properties.get("spark.executor.memory").orNull)
     executorCores = Option(executorCores)
-      .getOrElse(defaultProperties.get("spark.executor.cores").orNull)
+      .getOrElse(properties.get("spark.executor.cores").orNull)
     totalExecutorCores = Option(totalExecutorCores)
-      .getOrElse(defaultProperties.get("spark.cores.max").orNull)
-    name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
-    jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
+      .getOrElse(properties.get("spark.cores.max").orNull)
+    name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
+    jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)
 
     // This supports env vars in older versions of Spark
     master = Option(master).getOrElse(System.getenv("MASTER"))

http://git-wip-us.apache.org/repos/asf/spark/blob/f68105df/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index a301cbd..9190b05 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -253,6 +253,22 @@ class SparkSubmitSuite extends FunSuite with Matchers {
     sysProps("spark.shuffle.spill") should be ("false")
   }
 
+  test("handles confs with flag equivalents") {
+    val clArgs = Seq(
+      "--deploy-mode", "cluster",
+      "--executor-memory", "5g",
+      "--class", "org.SomeClass",
+      "--conf", "spark.executor.memory=4g",
+      "--conf", "spark.master=yarn",
+      "thejar.jar",
+      "arg1", "arg2")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    val (_, _, sysProps, mainClass) = createLaunchEnv(appArgs)
+    sysProps("spark.executor.memory") should be ("5g")
+    sysProps("spark.master") should be ("yarn-cluster")
+    mainClass should be ("org.apache.spark.deploy.yarn.Client")
+  }
+
   test("launch simple application with spark-submit") {
     val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
     val args = Seq(