You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/11 05:58:06 UTC

git commit: [SPARK-1774] Respect SparkSubmit --jars on YARN (client)

Repository: spark
Updated Branches:
  refs/heads/master 2b7bd29eb -> 83e0424d8


[SPARK-1774] Respect SparkSubmit --jars on YARN (client)

SparkSubmit ignores `--jars` for YARN client. This is a bug.

This PR also automatically adds the application jar to `spark.jar`. Previously, when running as yarn-client, you must specify the jar additionally through `--files` (because `--jars` didn't work). Now you don't have to explicitly specify it through either.

Tested on a YARN cluster.

Author: Andrew Or <an...@gmail.com>

Closes #710 from andrewor14/yarn-jars and squashes the following commits:

35d1928 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-jars
c27bf6c [Andrew Or] For yarn-cluster and python, do not add primaryResource to spark.jar
c92c5bf [Andrew Or] Minor cleanups
269f9f3 [Andrew Or] Fix format
013d840 [Andrew Or] Fix tests
1407474 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-jars
3bb75e8 [Andrew Or] Allow SparkSubmit --jars to take effect in yarn-client mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83e0424d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83e0424d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83e0424d

Branch: refs/heads/master
Commit: 83e0424d87022e7a967088365931a08aa06ffd9f
Parents: 2b7bd29
Author: Andrew Or <an...@gmail.com>
Authored: Sat May 10 20:58:02 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sat May 10 20:58:02 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   2 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   |  39 ++++---
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 110 +++++++++++++------
 .../spark/deploy/yarn/ClientArguments.scala     |   4 +-
 4 files changed, 102 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83e0424d/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c639b3e..71bab29 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging {
             if (SparkHadoopUtil.get.isYarnMode() &&
                 (master == "yarn-standalone" || master == "yarn-cluster")) {
               // In order for this to work in yarn-cluster mode the user must specify the
-              // --addjars option to the client to upload the file into the distributed cache
+              // --addJars option to the client to upload the file into the distributed cache
               // of the AM to make it show up in the current working directory.
               val fileName = new Path(uri.getPath).getName()
               try {

http://git-wip-us.apache.org/repos/asf/spark/blob/83e0424d/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 16de6f7..c6d3cbd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -67,8 +67,7 @@ object SparkSubmit {
   private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
 
   /**
-   * @return
-   *         a tuple containing the arguments for the child, a list of classpath
+   * @return a tuple containing the arguments for the child, a list of classpath
    *         entries for the child, a list of system propertes, a list of env vars
    *         and the main class for the child
    */
@@ -115,13 +114,16 @@ object SparkSubmit {
     val sysProps = new HashMap[String, String]()
     var childMainClass = ""
 
+    val isPython = args.isPython
+    val isYarnCluster = clusterManager == YARN && deployOnCluster
+
     if (clusterManager == MESOS && deployOnCluster) {
       printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
     }
 
     // If we're running a Python app, set the Java class to run to be our PythonRunner, add
     // Python files to deployment list, and pass the main file and Python path to PythonRunner
-    if (args.isPython) {
+    if (isPython) {
       if (deployOnCluster) {
         printErrorAndExit("Cannot currently run Python driver programs on cluster")
       }
@@ -161,6 +163,7 @@ object SparkSubmit {
     val options = List[OptionAssigner](
       OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
       OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
+      OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
       OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
         sysProp = "spark.driver.extraClassPath"),
       OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
@@ -168,7 +171,8 @@ object SparkSubmit {
       OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
         sysProp = "spark.driver.extraLibraryPath"),
       OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
-      OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
+      OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
+      OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
       OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
       OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
       OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
@@ -176,20 +180,18 @@ object SparkSubmit {
       OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
       OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
         sysProp = "spark.executor.memory"),
-      OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
-      OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
       OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
       OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
       OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
         sysProp = "spark.cores.max"),
       OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
       OptionAssigner(args.files, YARN, true, clOption = "--files"),
+      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
+      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
       OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
       OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
       OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
-      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
-      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
-      OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars")
+      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
     )
 
     // For client mode make any added jars immediately visible on the classpath
@@ -212,9 +214,10 @@ object SparkSubmit {
       }
     }
 
-    // For standalone mode, add the application jar automatically so the user doesn't have to
-    // call sc.addJar. TODO: Standalone mode in the cluster
-    if (clusterManager == STANDALONE) {
+    // Add the application jar automatically so the user doesn't have to call sc.addJar
+    // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
+    // For python files, the primary resource is already distributed as a regular file
+    if (!isYarnCluster && !isPython) {
       var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
       if (args.primaryResource != RESERVED_JAR_NAME) {
         jars = jars ++ Seq(args.primaryResource)
@@ -222,11 +225,11 @@ object SparkSubmit {
       sysProps.put("spark.jars", jars.mkString(","))
     }
 
+    // Standalone cluster specific configurations
     if (deployOnCluster && clusterManager == STANDALONE) {
       if (args.supervise) {
         childArgs += "--supervise"
       }
-
       childMainClass = "org.apache.spark.deploy.Client"
       childArgs += "launch"
       childArgs += (args.master, args.primaryResource, args.mainClass)
@@ -243,6 +246,7 @@ object SparkSubmit {
       }
     }
 
+    // Read from default spark properties, if any
     for ((k, v) <- args.getDefaultSparkProperties) {
       if (!sysProps.contains(k)) sysProps(k) = v
     }
@@ -250,9 +254,12 @@ object SparkSubmit {
     (childArgs, childClasspath, sysProps, childMainClass)
   }
 
-  private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
-      sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
-  {
+  private def launch(
+      childArgs: ArrayBuffer[String],
+      childClasspath: ArrayBuffer[String],
+      sysProps: Map[String, String],
+      childMainClass: String,
+      verbose: Boolean = false) {
     if (verbose) {
       printStream.println(s"Main class:\n$childMainClass")
       printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")

http://git-wip-us.apache.org/repos/asf/spark/blob/83e0424d/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index c9edb03..6c0deed 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -87,25 +87,41 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles arguments with --key=val") {
-    val clArgs = Seq("--jars=one.jar,two.jar,three.jar", "--name=myApp")
+    val clArgs = Seq(
+      "--jars=one.jar,two.jar,three.jar",
+      "--name=myApp")
     val appArgs = new SparkSubmitArguments(clArgs)
     appArgs.jars should be ("one.jar,two.jar,three.jar")
     appArgs.name should be ("myApp")
   }
 
   test("handles arguments to user program") {
-    val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args")
+    val clArgs = Seq(
+      "--name", "myApp",
+      "--class", "Foo",
+      "userjar.jar",
+      "some",
+      "--weird", "args")
     val appArgs = new SparkSubmitArguments(clArgs)
     appArgs.childArgs should be (Seq("some", "--weird", "args"))
   }
 
   test("handles YARN cluster mode") {
-    val clArgs = Seq("--deploy-mode", "cluster",
-      "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
-      "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
-      "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
-      "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "beauty",
-      "thejar.jar", "arg1", "arg2")
+    val clArgs = Seq(
+      "--deploy-mode", "cluster",
+      "--master", "yarn",
+      "--executor-memory", "5g",
+      "--executor-cores", "5",
+      "--class", "org.SomeClass",
+      "--jars", "one.jar,two.jar,three.jar",
+      "--driver-memory", "4g",
+      "--queue", "thequeue",
+      "--files", "file1.txt,file2.txt",
+      "--archives", "archive1.txt,archive2.txt",
+      "--num-executors", "6",
+      "--name", "beauty",
+      "thejar.jar",
+      "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     val childArgsStr = childArgs.mkString(" ")
@@ -127,12 +143,21 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles YARN client mode") {
-    val clArgs = Seq("--deploy-mode", "client",
-      "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
-      "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
-      "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
-      "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "trill",
-      "thejar.jar", "arg1", "arg2")
+    val clArgs = Seq(
+      "--deploy-mode", "client",
+      "--master", "yarn",
+      "--executor-memory", "5g",
+      "--executor-cores", "5",
+      "--class", "org.SomeClass",
+      "--jars", "one.jar,two.jar,three.jar",
+      "--driver-memory", "4g",
+      "--queue", "thequeue",
+      "--files", "file1.txt,file2.txt",
+      "--archives", "archive1.txt,archive2.txt",
+      "--num-executors", "6",
+      "--name", "trill",
+      "thejar.jar",
+      "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
@@ -142,6 +167,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     classpath should contain ("two.jar")
     classpath should contain ("three.jar")
     sysProps("spark.app.name") should be ("trill")
+    sysProps("spark.jars") should be ("one.jar,two.jar,three.jar,thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.executor.cores") should be ("5")
     sysProps("spark.yarn.queue") should be ("thequeue")
@@ -152,9 +178,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles standalone cluster mode") {
-    val clArgs = Seq("--deploy-mode", "cluster",
-      "--master", "spark://h:p", "--class", "org.SomeClass",
-      "--supervise", "--driver-memory", "4g", "--driver-cores", "5", "thejar.jar", "arg1", "arg2")
+    val clArgs = Seq(
+      "--deploy-mode", "cluster",
+      "--master", "spark://h:p",
+      "--class", "org.SomeClass",
+      "--supervise",
+      "--driver-memory", "4g",
+      "--driver-cores", "5",
+      "thejar.jar",
+      "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     val childArgsStr = childArgs.mkString(" ")
@@ -166,9 +198,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles standalone client mode") {
-    val clArgs = Seq("--deploy-mode", "client",
-      "--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
-      "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
+    val clArgs = Seq(
+      "--deploy-mode", "client",
+      "--master", "spark://h:p",
+      "--executor-memory", "5g",
+      "--total-executor-cores", "5",
+      "--class", "org.SomeClass",
+      "--driver-memory", "4g",
+      "thejar.jar",
+      "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
@@ -179,9 +217,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles mesos client mode") {
-    val clArgs = Seq("--deploy-mode", "client",
-      "--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
-      "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
+    val clArgs = Seq(
+      "--deploy-mode", "client",
+      "--master", "mesos://h:p",
+      "--executor-memory", "5g",
+      "--total-executor-cores", "5",
+      "--class", "org.SomeClass",
+      "--driver-memory", "4g",
+      "thejar.jar",
+      "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
@@ -192,15 +236,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("launch simple application with spark-submit") {
-    runSparkSubmit(
-      Seq(
-        "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
-        "--name", "testApp",
-        "--master", "local",
-        "unUsed.jar"))
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val args = Seq(
+      "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+      "--name", "testApp",
+      "--master", "local",
+      unusedJar.toString)
+    runSparkSubmit(args)
   }
 
   test("spark submit includes jars passed in through --jar") {
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
     val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
     val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
     val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
@@ -209,7 +255,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
       "--name", "testApp",
       "--master", "local-cluster[2,1,512]",
       "--jars", jarsString,
-      "unused.jar")
+      unusedJar.toString)
     runSparkSubmit(args)
   }
 
@@ -227,7 +273,7 @@ object JarCreationTest {
   def main(args: Array[String]) {
     val conf = new SparkConf()
     val sc = new SparkContext(conf)
-    val result = sc.makeRDD(1 to 100, 10).mapPartitions{ x =>
+    val result = sc.makeRDD(1 to 100, 10).mapPartitions { x =>
       var foundClasses = false
       try {
         Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
@@ -248,7 +294,6 @@ object SimpleApplicationTest {
   def main(args: Array[String]) {
     val conf = new SparkConf()
     val sc = new SparkContext(conf)
-
     val configs = Seq("spark.master", "spark.app.name")
     for (config <- configs) {
       val masterValue = conf.get(config)
@@ -266,6 +311,5 @@ object SimpleApplicationTest {
           s"Master had $config=$masterValue but executor had $config=$executorValue")
       }
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/83e0424d/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 3e4c739..b2c413b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
+import org.apache.spark.scheduler.InputFormatInfo
 import org.apache.spark.util.IntParam
 import org.apache.spark.util.MemoryParam
 
@@ -40,9 +40,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
   var amMemory: Int = 512 // MB
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"
-  // TODO
   var inputFormatInfo: List[InputFormatInfo] = null
-  // TODO(harvey)
   var priority = 0
 
   parseArgs(args.toList)