You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/31 21:07:23 UTC

git commit: SPARK-1352: Improve robustness of spark-submit script

Repository: spark
Updated Branches:
  refs/heads/master d66605367 -> 841721e03


SPARK-1352: Improve robustness of spark-submit script

1. Better error messages when required arguments are missing.
2. Support for unit testing cases where presented arguments are invalid.
3. Bug fix: Only use environment varaibles when they are set (otherwise will cause NPE).
4. A verbose mode to aid debugging.
5. Visibility of several variables is set to private.
6. Deprecation warning for existing scripts.

Author: Patrick Wendell <pw...@gmail.com>

Closes #271 from pwendell/spark-submit and squashes the following commits:

9146def [Patrick Wendell] SPARK-1352: Improve robustness of spark-submit script


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/841721e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/841721e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/841721e0

Branch: refs/heads/master
Commit: 841721e03cc44ee7d8fe72c882db8c0f9f3af365
Parents: d666053
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Mar 31 12:07:14 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Mar 31 12:07:14 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/Client.scala  |  3 +
 .../org/apache/spark/deploy/SparkSubmit.scala   | 67 +++++++++++-------
 .../spark/deploy/SparkSubmitArguments.scala     | 74 ++++++++++++++------
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 61 +++++++++++++++-
 .../org/apache/spark/deploy/yarn/Client.scala   |  3 +
 .../org/apache/spark/deploy/yarn/Client.scala   |  3 +
 6 files changed, 163 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/841721e0/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index d9e3035..8fd2c7e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -128,6 +128,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
  */
 object Client {
   def main(args: Array[String]) {
+    println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
+    println("Use ./bin/spark-submit with \"--master spark://host:port\"")
+
     val conf = new SparkConf()
     val driverArgs = new ClientArguments(args)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/841721e0/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 24a9c98..1fa7991 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy
 
-import java.io.File
+import java.io.{PrintStream, File}
 import java.net.URL
 
 import org.apache.spark.executor.ExecutorURLClassLoader
@@ -32,38 +32,51 @@ import scala.collection.mutable.Map
  * modes that Spark supports.
  */
 object SparkSubmit {
-  val YARN = 1
-  val STANDALONE = 2
-  val MESOS = 4
-  val LOCAL = 8
-  val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
+  private val YARN = 1
+  private val STANDALONE = 2
+  private val MESOS = 4
+  private val LOCAL = 8
+  private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
 
-  var clusterManager: Int = LOCAL
+  private var clusterManager: Int = LOCAL
 
   def main(args: Array[String]) {
     val appArgs = new SparkSubmitArguments(args)
+    if (appArgs.verbose) {
+      printStream.println(appArgs)
+    }
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
-    launch(childArgs, classpath, sysProps, mainClass)
+    launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
   }
 
+  // Exposed for testing
+  private[spark] var printStream: PrintStream = System.err
+  private[spark] var exitFn: () => Unit = () => System.exit(-1)
+
+  private[spark] def printErrorAndExit(str: String) = {
+    printStream.println("error: " + str)
+    printStream.println("run with --help for more information or --verbose for debugging output")
+    exitFn()
+  }
+  private[spark] def printWarning(str: String) = printStream.println("warning: " + str)
+
   /**
    * @return
    *         a tuple containing the arguments for the child, a list of classpath
    *         entries for the child, and the main class for the child
    */
-  def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
+  private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
       ArrayBuffer[String], Map[String, String], String) = {
-    if (appArgs.master.startsWith("yarn")) {
+    if (appArgs.master.startsWith("local")) {
+      clusterManager = LOCAL
+    } else if (appArgs.master.startsWith("yarn")) {
       clusterManager = YARN
     } else if (appArgs.master.startsWith("spark")) {
       clusterManager = STANDALONE
     } else if (appArgs.master.startsWith("mesos")) {
       clusterManager = MESOS
-    } else if (appArgs.master.startsWith("local")) {
-      clusterManager = LOCAL
     } else {
-      System.err.println("master must start with yarn, mesos, spark, or local")
-      System.exit(1)
+      printErrorAndExit("master must start with yarn, mesos, spark, or local")
     }
 
     // Because "yarn-standalone" and "yarn-client" encapsulate both the master
@@ -73,12 +86,10 @@ object SparkSubmit {
       appArgs.deployMode = "cluster"
     }
     if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
-      System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds")
-      System.exit(1)
+      printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
     }
     if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
-      System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds")
-      System.exit(1)
+      printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible")
     }
     if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
       appArgs.master = "yarn-standalone"
@@ -95,8 +106,7 @@ object SparkSubmit {
     var childMainClass = ""
 
     if (clusterManager == MESOS && deployOnCluster) {
-      System.err.println("Mesos does not support running the driver on the cluster")
-      System.exit(1)
+      printErrorAndExit("Mesos does not support running the driver on the cluster")
     }
 
     if (!deployOnCluster) {
@@ -174,8 +184,17 @@ object SparkSubmit {
     (childArgs, childClasspath, sysProps, childMainClass)
   }
 
-  def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
-      sysProps: Map[String, String], childMainClass: String) {
+  private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
+      sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
+
+    if (verbose) {
+      System.err.println(s"Main class:\n$childMainClass")
+      System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
+      System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
+      System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
+      System.err.println("\n")
+    }
+
     val loader = new ExecutorURLClassLoader(new Array[URL](0),
       Thread.currentThread.getContextClassLoader)
     Thread.currentThread.setContextClassLoader(loader)
@@ -193,10 +212,10 @@ object SparkSubmit {
     mainMethod.invoke(null, childArgs.toArray)
   }
 
-  def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
+  private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
     val localJarFile = new File(localJar)
     if (!localJarFile.exists()) {
-      System.err.println("Jar does not exist: " + localJar + ". Skipping.")
+      printWarning(s"Jar $localJar does not exist, skipping.")
     }
 
     val url = localJarFile.getAbsoluteFile.toURI.toURL

http://git-wip-us.apache.org/repos/asf/spark/blob/841721e0/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index ff2aa68..9c8f54e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -40,25 +40,45 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
   var name: String = null
   var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
   var jars: String = null
+  var verbose: Boolean = false
 
   loadEnvVars()
-  parseArgs(args.toList)
-
-  def loadEnvVars() {
-    master = System.getenv("MASTER")
-    deployMode = System.getenv("DEPLOY_MODE")
+  parseOpts(args.toList)
+
+  // Sanity checks
+  if (args.length == 0) printUsageAndExit(-1)
+  if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
+  if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+
+  override def toString =  {
+    s"""Parsed arguments:
+    |  master             $master
+    |  deployMode         $deployMode
+    |  executorMemory     $executorMemory
+    |  executorCores      $executorCores
+    |  totalExecutorCores $totalExecutorCores
+    |  driverMemory       $driverMemory
+    |  drivercores        $driverCores
+    |  supervise          $supervise
+    |  queue              $queue
+    |  numExecutors       $numExecutors
+    |  files              $files
+    |  archives           $archives
+    |  mainClass          $mainClass
+    |  primaryResource    $primaryResource
+    |  name               $name
+    |  childArgs          [${childArgs.mkString(" ")}]
+    |  jars               $jars
+    |  verbose            $verbose
+    """.stripMargin
   }
 
-  def parseArgs(args: List[String]) {
-    if (args.size == 0) {
-      printUsageAndExit(1)
-      System.exit(1)
-    }
-    primaryResource = args(0)
-    parseOpts(args.tail)
+  private def loadEnvVars() {
+    Option(System.getenv("MASTER")).map(master = _)
+    Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
   }
 
-  def parseOpts(opts: List[String]): Unit = opts match {
+  private def parseOpts(opts: List[String]): Unit = opts match {
     case ("--name") :: value :: tail =>
       name = value
       parseOpts(tail)
@@ -73,8 +93,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
 
     case ("--deploy-mode") :: value :: tail =>
       if (value != "client" && value != "cluster") {
-        System.err.println("--deploy-mode must be either \"client\" or \"cluster\"")
-        System.exit(1)
+        SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
       }
       deployMode = value
       parseOpts(tail)
@@ -130,17 +149,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
     case ("--help" | "-h") :: tail =>
       printUsageAndExit(0)
 
-    case Nil =>
+    case ("--verbose" | "-v") :: tail =>
+      verbose = true
+      parseOpts(tail)
 
-    case _ =>
-      printUsageAndExit(1, opts)
+    case value :: tail =>
+      if (primaryResource != null) {
+        val error = s"Found two conflicting resources, $value and $primaryResource." +
+          " Expecting only one resource."
+        SparkSubmit.printErrorAndExit(error)
+      }
+      primaryResource = value
+      parseOpts(tail)
+
+    case Nil =>
   }
 
-  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+  private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+    val outStream = SparkSubmit.printStream
     if (unknownParam != null) {
-      System.err.println("Unknown/unsupported param " + unknownParam)
+      outStream.println("Unknown/unsupported param " + unknownParam)
     }
-    System.err.println(
+    outStream.println(
       """Usage: spark-submit <primary binary> [options]
         |Options:
         |  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
@@ -171,6 +201,6 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
         |  --archives ARCHIVES         Comma separated list of archives to be extracted into the
         |                              working dir of each executor.""".stripMargin
     )
-    System.exit(exitCode)
+    SparkSubmit.exitFn()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/841721e0/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 29fef2e..4e489cd 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -17,14 +17,71 @@
 
 package org.apache.spark.deploy
 
+import java.io.{OutputStream, PrintStream}
+
+import scala.collection.mutable.ArrayBuffer
+
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
+
 import org.apache.spark.deploy.SparkSubmit._
 
+
 class SparkSubmitSuite extends FunSuite with ShouldMatchers {
+
+  val noOpOutputStream = new OutputStream {
+    def write(b: Int) = {}
+  }
+
+  /** Simple PrintStream that reads data into a buffer */
+  class BufferPrintStream extends PrintStream(noOpOutputStream) {
+    var lineBuffer = ArrayBuffer[String]()
+    override def println(line: String) {
+      lineBuffer += line
+    }
+  }
+
+  /** Returns true if the script exits and the given search string is printed. */
+  def testPrematureExit(input: Array[String], searchString: String): Boolean = {
+    val printStream = new BufferPrintStream()
+    SparkSubmit.printStream = printStream
+
+    @volatile var exitedCleanly = false
+    SparkSubmit.exitFn = () => exitedCleanly = true
+
+    val thread = new Thread {
+      override def run() = try {
+        SparkSubmit.main(input)
+      } catch {
+        // If exceptions occur after the "exit" has happened, fine to ignore them.
+        // These represent code paths not reachable during normal execution.
+        case e: Exception => if (!exitedCleanly) throw e
+      }
+    }
+    thread.start()
+    thread.join()
+    printStream.lineBuffer.find(s => s.contains(searchString)).size > 0
+  }
+
   test("prints usage on empty input") {
-    val clArgs = Array[String]()
-    // val appArgs = new SparkSubmitArguments(clArgs)
+    testPrematureExit(Array[String](), "Usage: spark-submit") should be (true)
+  }
+
+  test("prints usage with only --help") {
+    testPrematureExit(Array("--help"), "Usage: spark-submit") should be (true)
+  }
+
+  test("handles multiple binary definitions") {
+    val adjacentJars = Array("foo.jar", "bar.jar")
+    testPrematureExit(adjacentJars, "error: Found two conflicting resources") should be (true)
+
+    val nonAdjacentJars =
+      Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar")
+    testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") should be (true)
+  }
+
+  test("handle binary specified but not class") {
+    testPrematureExit(Array("foo.jar"), "must specify a main class")
   }
 
   test("handles YARN cluster mode") {

http://git-wip-us.apache.org/repos/asf/spark/blob/841721e0/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 71a64ec..0179b06 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -167,6 +167,9 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
 object Client {
 
   def main(argStrings: Array[String]) {
+    println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
+    println("Use ./bin/spark-submit with \"--master yarn\"")
+
     // Set an env variable indicating we are running in YARN mode.
     // Note that anything with SPARK prefix gets propagated to all (remote) processes
     System.setProperty("SPARK_YARN_MODE", "true")

http://git-wip-us.apache.org/repos/asf/spark/blob/841721e0/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 837b7e1..77eb127 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -173,6 +173,9 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
 object Client {
 
   def main(argStrings: Array[String]) {
+    println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
+    println("Use ./bin/spark-submit with \"--master yarn\"")
+
     // Set an env variable indicating we are running in YARN mode.
     // Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
     // see Client#setupLaunchEnv().