You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/29 22:41:46 UTC

git commit: SPARK-1126. spark-app preliminary

Repository: spark
Updated Branches:
  refs/heads/master 3738f2442 -> 161781609


SPARK-1126.  spark-app preliminary

This is a starting version of the spark-app script for running compiled binaries against Spark.  It still needs tests and some polish.  The only testing I've done so far has been using it to launch jobs in yarn-standalone mode against a pseudo-distributed cluster.

This leaves out the changes required for launching python scripts.  I think it might be best to save those for another JIRA/PR (while keeping to the design so that they won't require backwards-incompatible changes).

Author: Sandy Ryza <sa...@cloudera.com>

Closes #86 from sryza/sandy-spark-1126 and squashes the following commits:

d428d85 [Sandy Ryza] Commenting, doc, and import fixes from Patrick's comments
e7315c6 [Sandy Ryza] Fix failing tests
34de899 [Sandy Ryza] Change --more-jars to --jars and fix docs
299ddca [Sandy Ryza] Fix scalastyle
a94c627 [Sandy Ryza] Add newline at end of SparkSubmit
04bc4e2 [Sandy Ryza] SPARK-1126. spark-submit script


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16178160
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16178160
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16178160

Branch: refs/heads/master
Commit: 1617816090e7b20124a512a43860a21232ebf511
Parents: 3738f24
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Sat Mar 29 14:41:36 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sat Mar 29 14:41:36 2014 -0700

----------------------------------------------------------------------
 bin/spark-submit                                |  38 ++++
 .../org/apache/spark/deploy/SparkSubmit.scala   | 212 +++++++++++++++++++
 .../spark/deploy/SparkSubmitArguments.scala     | 176 +++++++++++++++
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 121 +++++++++++
 docs/cluster-overview.md                        |  50 +++++
 docs/running-on-yarn.md                         |   6 +-
 docs/spark-standalone.md                        |   7 +-
 .../cluster/YarnClientSchedulerBackend.scala    |  45 ++--
 8 files changed, 630 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16178160/bin/spark-submit
----------------------------------------------------------------------
diff --git a/bin/spark-submit b/bin/spark-submit
new file mode 100755
index 0000000..d92d55a
--- /dev/null
+++ b/bin/spark-submit
@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
+ORIG_ARGS=$@
+
+while (($#)); do
+  if [ $1 = "--deploy-mode" ]; then
+    DEPLOY_MODE=$2
+  elif [ $1 = "--driver-memory" ]; then
+    DRIVER_MEMORY=$2
+  fi
+
+  shift
+done
+
+if [ ! -z $DRIVER_MEMORY ] && [ ! -z $DEPLOY_MODE ] && [ $DEPLOY_MODE = "client" ]; then
+  export SPARK_MEM=$DRIVER_MEMORY
+fi
+
+$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit $ORIG_ARGS
+

http://git-wip-us.apache.org/repos/asf/spark/blob/16178160/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
new file mode 100644
index 0000000..24a9c98
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.File
+import java.net.URL
+
+import org.apache.spark.executor.ExecutorURLClassLoader
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
+/**
+ * Scala code behind the spark-submit script.  The script handles setting up the classpath with
+ * relevant Spark dependencies and provides a layer over the different cluster managers and deploy
+ * modes that Spark supports.
+ */
+object SparkSubmit {
+  val YARN = 1
+  val STANDALONE = 2
+  val MESOS = 4
+  val LOCAL = 8
+  val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
+
+  var clusterManager: Int = LOCAL
+
+  def main(args: Array[String]) {
+    val appArgs = new SparkSubmitArguments(args)
+    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    launch(childArgs, classpath, sysProps, mainClass)
+  }
+
+  /**
+   * @return
+   *         a tuple containing the arguments for the child, a list of classpath
+   *         entries for the child, and the main class for the child
+   */
+  def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
+      ArrayBuffer[String], Map[String, String], String) = {
+    if (appArgs.master.startsWith("yarn")) {
+      clusterManager = YARN
+    } else if (appArgs.master.startsWith("spark")) {
+      clusterManager = STANDALONE
+    } else if (appArgs.master.startsWith("mesos")) {
+      clusterManager = MESOS
+    } else if (appArgs.master.startsWith("local")) {
+      clusterManager = LOCAL
+    } else {
+      System.err.println("master must start with yarn, mesos, spark, or local")
+      System.exit(1)
+    }
+
+    // Because "yarn-standalone" and "yarn-client" encapsulate both the master
+    // and deploy mode, we have some logic to infer the master and deploy mode
+    // from each other if only one is specified, or exit early if they are at odds.
+    if (appArgs.deployMode == null && appArgs.master == "yarn-standalone") {
+      appArgs.deployMode = "cluster"
+    }
+    if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
+      System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds")
+      System.exit(1)
+    }
+    if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
+      System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds")
+      System.exit(1)
+    }
+    if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
+      appArgs.master = "yarn-standalone"
+    }
+    if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) {
+      appArgs.master = "yarn-client"
+    }
+
+    val deployOnCluster = Option(appArgs.deployMode).getOrElse("client") == "cluster"
+
+    val childClasspath = new ArrayBuffer[String]()
+    val childArgs = new ArrayBuffer[String]()
+    val sysProps = new HashMap[String, String]()
+    var childMainClass = ""
+
+    if (clusterManager == MESOS && deployOnCluster) {
+      System.err.println("Mesos does not support running the driver on the cluster")
+      System.exit(1)
+    }
+
+    if (!deployOnCluster) {
+      childMainClass = appArgs.mainClass
+      childClasspath += appArgs.primaryResource
+    } else if (clusterManager == YARN) {
+      childMainClass = "org.apache.spark.deploy.yarn.Client"
+      childArgs += ("--jar", appArgs.primaryResource)
+      childArgs += ("--class", appArgs.mainClass)
+    }
+
+    val options = List[OptionAssigner](
+      new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
+      new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
+      new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
+      new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
+      new OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"),
+      new OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"),
+      new OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
+      new OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"),
+      new OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false,
+        sysProp = "spark.executor.memory"),
+      new OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"),
+      new OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"),
+      new OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"),
+      new OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"),
+      new OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false,
+        sysProp = "spark.cores.max"),
+      new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"),
+      new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
+      new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
+      new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
+      new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars")
+    )
+
+    // more jars
+    if (appArgs.jars != null && !deployOnCluster) {
+      for (jar <- appArgs.jars.split(",")) {
+        childClasspath += jar
+      }
+    }
+
+    for (opt <- options) {
+      if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
+        (clusterManager & opt.clusterManager) != 0) {
+        if (opt.clOption != null) {
+          childArgs += (opt.clOption, opt.value)
+        } else if (opt.sysProp != null) {
+          sysProps.put(opt.sysProp, opt.value)
+        }
+      }
+    }
+
+    if (deployOnCluster && clusterManager == STANDALONE) {
+      if (appArgs.supervise) {
+        childArgs += "--supervise"
+      }
+
+      childMainClass = "org.apache.spark.deploy.Client"
+      childArgs += "launch"
+      childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
+    }
+
+    // args
+    if (appArgs.childArgs != null) {
+      if (!deployOnCluster || clusterManager == STANDALONE) {
+        childArgs ++= appArgs.childArgs
+      } else if (clusterManager == YARN) {
+        for (arg <- appArgs.childArgs) {
+          childArgs += ("--args", arg)
+        }
+      }
+    }
+
+    (childArgs, childClasspath, sysProps, childMainClass)
+  }
+
+  def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
+      sysProps: Map[String, String], childMainClass: String) {
+    val loader = new ExecutorURLClassLoader(new Array[URL](0),
+      Thread.currentThread.getContextClassLoader)
+    Thread.currentThread.setContextClassLoader(loader)
+
+    for (jar <- childClasspath) {
+      addJarToClasspath(jar, loader)
+    }
+
+    for ((key, value) <- sysProps) {
+      System.setProperty(key, value)
+    }
+
+    val mainClass = Class.forName(childMainClass, true, loader)
+    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
+    mainMethod.invoke(null, childArgs.toArray)
+  }
+
+  def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
+    val localJarFile = new File(localJar)
+    if (!localJarFile.exists()) {
+      System.err.println("Jar does not exist: " + localJar + ". Skipping.")
+    }
+
+    val url = localJarFile.getAbsoluteFile.toURI.toURL
+    loader.addURL(url)
+  }
+}
+
+private[spark] class OptionAssigner(val value: String,
+  val clusterManager: Int,
+  val deployOnCluster: Boolean,
+  val clOption: String = null,
+  val sysProp: String = null
+) { }

http://git-wip-us.apache.org/repos/asf/spark/blob/16178160/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
new file mode 100644
index 0000000..ff2aa68
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Parses and encapsulates arguments from the spark-submit script.
+ */
+private[spark] class SparkSubmitArguments(args: Array[String]) {
+  var master: String = "local"
+  var deployMode: String = null
+  var executorMemory: String = null
+  var executorCores: String = null
+  var totalExecutorCores: String = null
+  var driverMemory: String = null
+  var driverCores: String = null
+  var supervise: Boolean = false
+  var queue: String = null
+  var numExecutors: String = null
+  var files: String = null
+  var archives: String = null
+  var mainClass: String = null
+  var primaryResource: String = null
+  var name: String = null
+  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
+  var jars: String = null
+
+  loadEnvVars()
+  parseArgs(args.toList)
+
+  def loadEnvVars() {
+    master = System.getenv("MASTER")
+    deployMode = System.getenv("DEPLOY_MODE")
+  }
+
+  def parseArgs(args: List[String]) {
+    if (args.size == 0) {
+      printUsageAndExit(1)
+      System.exit(1)
+    }
+    primaryResource = args(0)
+    parseOpts(args.tail)
+  }
+
+  def parseOpts(opts: List[String]): Unit = opts match {
+    case ("--name") :: value :: tail =>
+      name = value
+      parseOpts(tail)
+
+    case ("--master") :: value :: tail =>
+      master = value
+      parseOpts(tail)
+
+    case ("--class") :: value :: tail =>
+      mainClass = value
+      parseOpts(tail)
+
+    case ("--deploy-mode") :: value :: tail =>
+      if (value != "client" && value != "cluster") {
+        System.err.println("--deploy-mode must be either \"client\" or \"cluster\"")
+        System.exit(1)
+      }
+      deployMode = value
+      parseOpts(tail)
+
+    case ("--num-executors") :: value :: tail =>
+      numExecutors = value
+      parseOpts(tail)
+
+    case ("--total-executor-cores") :: value :: tail =>
+      totalExecutorCores = value
+      parseOpts(tail)
+
+    case ("--executor-cores") :: value :: tail =>
+      executorCores = value
+      parseOpts(tail)
+
+    case ("--executor-memory") :: value :: tail =>
+      executorMemory = value
+      parseOpts(tail)
+
+    case ("--driver-memory") :: value :: tail =>
+      driverMemory = value
+      parseOpts(tail)
+
+    case ("--driver-cores") :: value :: tail =>
+      driverCores = value
+      parseOpts(tail)
+
+    case ("--supervise") :: tail =>
+      supervise = true
+      parseOpts(tail)
+
+    case ("--queue") :: value :: tail =>
+      queue = value
+      parseOpts(tail)
+
+    case ("--files") :: value :: tail =>
+      files = value
+      parseOpts(tail)
+
+    case ("--archives") :: value :: tail =>
+      archives = value
+      parseOpts(tail)
+
+    case ("--arg") :: value :: tail =>
+      childArgs += value
+      parseOpts(tail)
+
+    case ("--jars") :: value :: tail =>
+      jars = value
+      parseOpts(tail)
+
+    case ("--help" | "-h") :: tail =>
+      printUsageAndExit(0)
+
+    case Nil =>
+
+    case _ =>
+      printUsageAndExit(1, opts)
+  }
+
+  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+    if (unknownParam != null) {
+      System.err.println("Unknown/unsupported param " + unknownParam)
+    }
+    System.err.println(
+      """Usage: spark-submit <primary binary> [options]
+        |Options:
+        |  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
+        |  --deploy-mode DEPLOY_MODE   Mode to deploy the app in, either 'client' or 'cluster'.
+        |  --class CLASS_NAME          Name of your app's main class (required for Java apps).
+        |  --arg ARG                   Argument to be passed to your application's main class. This
+        |                              option can be specified multiple times for multiple args.
+        |  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 512M).
+        |  --name NAME                 The name of your application (Default: 'Spark').
+        |  --jars JARS                 A comma-separated list of local jars to include on the
+        |                              driver classpath and that SparkContext.addJar will work
+        |                              with. Doesn't work on standalone with 'cluster' deploy mode.
+        |
+        | Spark standalone with cluster deploy mode only:
+        |  --driver-cores NUM          Cores for driver (Default: 1).
+        |  --supervise                 If given, restarts the driver on failure.
+        |
+        | Spark standalone and Mesos only:
+        |  --total-executor-cores NUM  Total cores for all executors.
+        |
+        | YARN-only:
+        |  --executor-cores NUM        Number of cores per executor (Default: 1).
+        |  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).
+        |  --queue QUEUE_NAME          The YARN queue to submit to (Default: 'default').
+        |  --num-executors NUM         Number of executors to (Default: 2).
+        |  --files FILES               Comma separated list of files to be placed in the working dir
+        |                              of each executor.
+        |  --archives ARCHIVES         Comma separated list of archives to be extracted into the
+        |                              working dir of each executor.""".stripMargin
+    )
+    System.exit(exitCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/16178160/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
new file mode 100644
index 0000000..29fef2e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.deploy.SparkSubmit._
+
+class SparkSubmitSuite extends FunSuite with ShouldMatchers {
+  test("prints usage on empty input") {
+    val clArgs = Array[String]()
+    // val appArgs = new SparkSubmitArguments(clArgs)
+  }
+
+  test("handles YARN cluster mode") {
+    val clArgs = Array("thejar.jar", "--deploy-mode", "cluster",
+      "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
+      "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
+      "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g",
+      "--queue", "thequeue", "--files", "file1.txt,file2.txt",
+      "--archives", "archive1.txt,archive2.txt", "--num-executors", "6")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    val childArgsStr = childArgs.mkString(" ")
+    childArgsStr should include ("--jar thejar.jar")
+    childArgsStr should include ("--class org.SomeClass")
+    childArgsStr should include ("--addJars one.jar,two.jar,three.jar")
+    childArgsStr should include ("--executor-memory 5g")
+    childArgsStr should include ("--driver-memory 4g")
+    childArgsStr should include ("--executor-cores 5")
+    childArgsStr should include ("--args arg1 --args arg2")
+    childArgsStr should include ("--queue thequeue")
+    childArgsStr should include ("--files file1.txt,file2.txt")
+    childArgsStr should include ("--archives archive1.txt,archive2.txt")
+    childArgsStr should include ("--num-executors 6")
+    mainClass should be ("org.apache.spark.deploy.yarn.Client")
+    classpath should have length (0)
+    sysProps should have size (0)
+  }
+
+  test("handles YARN client mode") {
+    val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+      "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
+      "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
+      "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g",
+      "--queue", "thequeue", "--files", "file1.txt,file2.txt",
+      "--archives", "archive1.txt,archive2.txt", "--num-executors", "6")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    childArgs.mkString(" ") should be ("arg1 arg2")
+    mainClass should be ("org.SomeClass")
+    classpath should contain ("thejar.jar")
+    classpath should contain ("one.jar")
+    classpath should contain ("two.jar")
+    classpath should contain ("three.jar")
+    sysProps("spark.executor.memory") should be ("5g")
+    sysProps("spark.executor.cores") should be ("5")
+    sysProps("spark.yarn.queue") should be ("thequeue")
+    sysProps("spark.yarn.dist.files") should be ("file1.txt,file2.txt")
+    sysProps("spark.yarn.dist.archives") should be ("archive1.txt,archive2.txt")
+    sysProps("spark.executor.instances") should be ("6")
+  }
+
+  test("handles standalone cluster mode") {
+    val clArgs = Array("thejar.jar", "--deploy-mode", "cluster",
+      "--master", "spark://h:p", "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
+      "--supervise", "--driver-memory", "4g", "--driver-cores", "5")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    val childArgsStr = childArgs.mkString(" ")
+    print("child args: " + childArgsStr)
+    childArgsStr.startsWith("--memory 4g --cores 5 --supervise") should be (true)
+    childArgsStr should include ("launch spark://h:p thejar.jar org.SomeClass arg1 arg2")
+    mainClass should be ("org.apache.spark.deploy.Client")
+    classpath should have length (0)
+    sysProps should have size (0)
+  }
+
+  test("handles standalone client mode") {
+    val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+      "--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
+      "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
+      "--driver-memory", "4g")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    childArgs.mkString(" ") should be ("arg1 arg2")
+    mainClass should be ("org.SomeClass")
+    classpath should contain ("thejar.jar")
+    sysProps("spark.executor.memory") should be ("5g")
+    sysProps("spark.cores.max") should be ("5")
+  }
+
+  test("handles mesos client mode") {
+    val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+      "--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
+      "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
+      "--driver-memory", "4g")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
+    childArgs.mkString(" ") should be ("arg1 arg2")
+    mainClass should be ("org.SomeClass")
+    classpath should contain ("thejar.jar")
+    sysProps("spark.executor.memory") should be ("5g")
+    sysProps("spark.cores.max") should be ("5")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/16178160/docs/cluster-overview.md
----------------------------------------------------------------------
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index a555a7b..b69e341 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -50,6 +50,50 @@ The system currently supports three cluster managers:
 In addition, Spark's [EC2 launch scripts](ec2-scripts.html) make it easy to launch a standalone
 cluster on Amazon EC2.
 
+# Launching Applications
+
+The recommended way to launch a compiled Spark application is through the spark-submit script (located in the
+bin directory), which takes care of setting up the classpath with Spark and its dependencies, as well as
+provides a layer over the different cluster managers and deploy modes that Spark supports.  It's usage is
+
+  spark-submit `<jar>` `<options>`
+
+Where options are any of:
+
+- **\--class** - The main class to run.
+- **\--master** - The URL of the cluster manager master, e.g. spark://host:port, mesos://host:port, yarn,
+  or local.
+- **\--deploy-mode** - "client" to run the driver in the client process or "cluster" to run the driver in
+  a process on the cluster.  For Mesos, only "client" is supported.
+- **\--executor-memory** - Memory per executor (e.g. 1000M, 2G).
+- **\--executor-cores** - Number of cores per executor. (Default: 2)
+- **\--driver-memory** - Memory for driver (e.g. 1000M, 2G)
+- **\--name** - Name of the application.
+- **\--arg** - Argument to be passed to the application's main class. This option can be specified
+  multiple times to pass multiple arguments.
+- **\--jars** - A comma-separated list of local jars to include on the driver classpath and that
+  SparkContext.addJar will work with. Doesn't work on standalone with 'cluster' deploy mode.
+
+The following currently only work for Spark standalone with cluster deploy mode:
+
+- **\--driver-cores** - Cores for driver (Default: 1).
+- **\--supervise** - If given, restarts the driver on failure.
+
+The following only works for Spark standalone and Mesos only:
+
+- **\--total-executor-cores** - Total cores for all executors.
+
+The following currently only work for YARN:
+
+- **\--queue** - The YARN queue to place the application in.
+- **\--files** - Comma separated list of files to be placed in the working dir of each executor.
+- **\--archives** - Comma separated list of archives to be extracted into the working dir of each
+  executor.
+- **\--num-executors** - Number of executors (Default: 2).
+
+The master and deploy mode can also be set with the MASTER and DEPLOY_MODE environment variables.
+Values for these options passed via command line will override the environment variables.
+
 # Shipping Code to the Cluster
 
 The recommended way to ship your code to the cluster is to pass it through SparkContext's constructor,
@@ -103,6 +147,12 @@ The following table summarizes terms you'll see used to refer to cluster concept
       <td>An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)</td>
     </tr>
     <tr>
+      <td>Deploy mode</td>
+      <td>Distinguishes where the driver process runs. In "cluster" mode, the framework launches
+        the driver inside of the cluster. In "client" mode, the submitter launches the driver
+        outside of the cluster.</td>
+    <tr>
+    <tr>
       <td>Worker node</td>
       <td>Any node that can run application code in the cluster</td>
     </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/16178160/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 2e9dec4..d8657c4 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -48,10 +48,12 @@ System Properties:
 Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster.
 These configs are used to connect to the cluster, write to the dfs, and connect to the YARN ResourceManager.
 
-There are two scheduler modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
+There are two deploy modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
 
 Unlike in Spark standalone and Mesos mode, in which the master's address is specified in the "master" parameter, in YARN mode the ResourceManager's address is picked up from the Hadoop configuration.  Thus, the master parameter is simply "yarn-client" or "yarn-cluster".
 
+The spark-submit script described in the [cluster mode overview](cluster-overview.html) provides the most straightforward way to submit a compiled Spark application to YARN in either deploy mode. For info on the lower-level invocations it uses, read ahead. For running spark-shell against YARN, skip down to the yarn-client section. 
+
 ## Launching a Spark application with yarn-cluster mode.
 
 The command to launch the Spark application on the cluster is as follows:
@@ -121,7 +123,7 @@ or
     MASTER=yarn-client ./bin/spark-shell
 
 
-## Viewing logs
+# Viewing logs
 
 In YARN terminology, executors and application masters run inside "containers". YARN has two modes for handling container logs after an application has completed. If log aggregation is turned on (with the yarn.log-aggregation-enable config), container logs are copied to HDFS and deleted on the local machine. These logs can be viewed from anywhere on the cluster with the "yarn logs" command.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/16178160/docs/spark-standalone.md
----------------------------------------------------------------------
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 51fb3a4..7e4eea3 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -146,10 +146,13 @@ automatically set MASTER from the `SPARK_MASTER_IP` and `SPARK_MASTER_PORT` vari
 
 You can also pass an option `-c <numCores>` to control the number of cores that spark-shell uses on the cluster.
 
-# Launching Applications Inside the Cluster
+# Launching Compiled Spark Applications
 
-You may also run your application entirely inside of the cluster by submitting your application driver using the submission client. The syntax for submitting applications is as follows:
+Spark supports two deploy modes. Spark applications may run with the driver inside the client process or entirely inside the cluster.
 
+The spark-submit script described in the [cluster mode overview](cluster-overview.html) provides the most straightforward way to submit a compiled Spark application to the cluster in either deploy mode. For info on the lower-level invocations used to launch an app inside the cluster, read ahead.
+
+## Launching Applications Inside the Cluster
 
     ./bin/spark-class org.apache.spark.deploy.Client launch
        [client-options] \

http://git-wip-us.apache.org/repos/asf/spark/blob/16178160/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d1f13e3..1619188 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -33,11 +33,12 @@ private[spark] class YarnClientSchedulerBackend(
   var client: Client = null
   var appId: ApplicationId = null
 
-  private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) {
-    Option(System.getenv(optionalParam)) foreach {
-      optParam => {
-        arrayBuf += (optionName, optParam)
-      }
+  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
+      arrayBuf: ArrayBuffer[String]) {
+    if (System.getProperty(sysProp) != null) {
+      arrayBuf += (optionName, System.getProperty(sysProp))
+    } else if (System.getenv(envVar) != null) {
+      arrayBuf += (optionName, System.getenv(envVar))
     }
   }
 
@@ -56,22 +57,24 @@ private[spark] class YarnClientSchedulerBackend(
       "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
     )
 
-    // process any optional arguments, use the defaults already defined in ClientArguments 
-    // if things aren't specified
-    Map("SPARK_MASTER_MEMORY" -> "--driver-memory",
-      "SPARK_DRIVER_MEMORY" -> "--driver-memory",
-      "SPARK_WORKER_INSTANCES" -> "--num-executors",
-      "SPARK_WORKER_MEMORY" -> "--executor-memory",
-      "SPARK_WORKER_CORES" -> "--executor-cores",
-      "SPARK_EXECUTOR_INSTANCES" -> "--num-executors",
-      "SPARK_EXECUTOR_MEMORY" -> "--executor-memory",
-      "SPARK_EXECUTOR_CORES" -> "--executor-cores",
-      "SPARK_YARN_QUEUE" -> "--queue",
-      "SPARK_YARN_APP_NAME" -> "--name",
-      "SPARK_YARN_DIST_FILES" -> "--files",
-      "SPARK_YARN_DIST_ARCHIVES" -> "--archives")
-    .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) }
-      
+    // process any optional arguments, given either as environment variables
+    // or system properties. use the defaults already defined in ClientArguments
+    // if things aren't specified. system properties override environment
+    // variables.
+    List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
+      ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"),
+      ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
+      ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
+      ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
+      ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
+      ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
+      ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+      ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"),
+      ("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"),
+      ("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"))
+    .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
+
     logDebug("ClientArguments called with: " + argsArrayBuf)
     val args = new ClientArguments(argsArrayBuf.toArray, conf)
     client = new Client(args, conf)