You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/02/07 00:57:08 UTC

[2/2] spark git commit: [SPARK-5388] Provide a stable application submission gateway for standalone cluster mode

[SPARK-5388] Provide a stable application submission gateway for standalone cluster mode

The goal is to provide a stable, REST-based application submission gateway that is not inherently based on Akka, which is unstable across versions. This PR targets standalone cluster mode, but is implemented in a general enough manner that can be potentially extended to other modes in the future. Client mode is currently not included in the changes here because there are many more Akka messages exchanged there.

As of the changes here, the Master will advertise two ports, 7077 and 6066. We need to keep around the old one (7077) for client mode and older versions of Spark submit. However, all new versions of Spark submit will use the REST gateway (6066).

By the way this includes ~700 lines of tests and ~200 lines of license.

Author: Andrew Or <an...@databricks.com>

Closes #4216 from andrewor14/rest and squashes the following commits:

8d7ce07 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
6f0c597 [Andrew Or] Use nullable fields for integer and boolean values
dfe4bd7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
b9e2a08 [Andrew Or] Minor comments
02b5cea [Andrew Or] Fix tests
d2b1ef8 [Andrew Or] Comment changes + minor code refactoring across the board
9c82a36 [Andrew Or] Minor comment and wording updates
b4695e7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
c9a8ad7 [Andrew Or] Do not include appResource and mainClass as properties
6fc7670 [Andrew Or] Report REST server response back to the user
40e6095 [Andrew Or] Pass submit parameters through system properties
cbd670b [Andrew Or] Include unknown fields, if any, in server response
9fee16f [Andrew Or] Include server protocol version on mismatch
09f873a [Andrew Or] Fix style
8188e61 [Andrew Or] Upgrade Jackson from 2.3.0 to 2.4.4
37538e0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
9165ae8 [Andrew Or] Fall back to Akka if endpoint was not REST
252d53c [Andrew Or] Clean up server error handling behavior further
c643f64 [Andrew Or] Fix style
bbbd329 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
792e112 [Andrew Or] Use specific HTTP response codes on error
f98660b [Andrew Or] Version the protocol and include it in REST URL
721819f [Andrew Or] Provide more REST-like interface for submit/kill/status
581f7bf [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
9e0d1af [Andrew Or] Move some classes around to reduce number of files (minor)
42e5de4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
1f1c03f [Andrew Or] Use Jackson's DefaultScalaModule to simplify messages
9229433 [Andrew Or] Reduce duplicate naming in REST field
ade28fd [Andrew Or] Clean up REST response output in Spark submit
b2fef8b [Andrew Or] Abstract the success field to the general response
6c57b4b [Andrew Or] Increase timeout in end-to-end tests
bf696ff [Andrew Or] Add checks for enabling REST when using kill/status
7ee6737 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
e2f7f5f [Andrew Or] Provide more safeguard against missing fields
9581df7 [Andrew Or] Clean up uses of exceptions
914fdff [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
e2104e6 [Andrew Or] stable -> rest
3db7379 [Andrew Or] Fix comments and name fields for better error messages
8d43486 [Andrew Or] Replace SubmitRestProtocolAction with class name
df90e8b [Andrew Or] Use Jackson for JSON de/serialization
d7a1f9f [Andrew Or] Fix local cluster tests
efa5e18 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
e42c131 [Andrew Or] Add end-to-end tests for standalone REST protocol
837475b [Andrew Or] Show the REST port on the Master UI
d8d3717 [Andrew Or] Use a daemon thread pool for REST server
6568ca5 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
77774ba [Andrew Or] Minor fixes
206cae4 [Andrew Or] Refactor and add tests for the REST protocol
63c05b3 [Andrew Or] Remove MASTER as a field (minor)
9e21b72 [Andrew Or] Action -> SparkSubmitAction (minor)
51c5ca6 [Andrew Or] Distinguish client and server side Spark versions
b44e103 [Andrew Or] Implement status requests + fix validation behavior
120ab9d [Andrew Or] Support kill and request driver status through SparkSubmit
544de1d [Andrew Or] Major clean ups in code and comments
e958cae [Andrew Or] Supported nested values in messages
484bd21 [Andrew Or] Specify an ordering for fields in SubmitDriverRequestMessage
6ff088d [Andrew Or] Rename classes to generalize REST protocol
af9d9cb [Andrew Or] Integrate REST protocol in standalone mode
53e7c0e [Andrew Or] Initial client, server, and all the messages


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1390e56f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1390e56f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1390e56f

Branch: refs/heads/master
Commit: 1390e56fa86d07b588f0c3a1a2cc25ab5bb0db67
Parents: e772b4e
Author: Andrew Or <an...@databricks.com>
Authored: Fri Feb 6 15:57:06 2015 -0800
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Fri Feb 6 15:57:06 2015 -0800

----------------------------------------------------------------------
 core/pom.xml                                    |   8 +
 .../org/apache/spark/ui/static/webui.css        |   6 +
 .../scala/org/apache/spark/SparkContext.scala   |   2 +-
 .../apache/spark/deploy/ClientArguments.scala   |  20 +-
 .../org/apache/spark/deploy/DeployMessage.scala |  15 +-
 .../apache/spark/deploy/LocalSparkCluster.scala |  12 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   | 142 ++++--
 .../spark/deploy/SparkSubmitArguments.scala     |  81 +++-
 .../org/apache/spark/deploy/master/Master.scala |  46 +-
 .../spark/deploy/master/MasterMessages.scala    |   4 +-
 .../spark/deploy/master/ui/MasterPage.scala     |  18 +-
 .../deploy/rest/StandaloneRestClient.scala      | 307 +++++++++++++
 .../deploy/rest/StandaloneRestServer.scala      | 449 +++++++++++++++++++
 .../rest/SubmitRestProtocolException.scala      |  36 ++
 .../deploy/rest/SubmitRestProtocolMessage.scala | 146 ++++++
 .../deploy/rest/SubmitRestProtocolRequest.scala |  78 ++++
 .../rest/SubmitRestProtocolResponse.scala       |  85 ++++
 .../apache/spark/deploy/JsonProtocolSuite.scala |   3 +-
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  57 ++-
 .../deploy/rest/StandaloneRestSubmitSuite.scala | 265 +++++++++++
 .../deploy/rest/SubmitRestProtocolSuite.scala   | 324 +++++++++++++
 .../KryoSerializerDistributedSuite.scala        |   6 +-
 pom.xml                                         |  11 +
 23 files changed, 2027 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 4daaf88..6618003 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -244,6 +244,14 @@
       <artifactId>metrics-graphite</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.module</groupId>
+      <artifactId>jackson-module-scala_2.10</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.derby</groupId>
       <artifactId>derby</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/resources/org/apache/spark/ui/static/webui.css
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index f23ba9d..68b33b5 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -103,6 +103,12 @@ span.expand-details {
   float: right;
 }
 
+span.rest-uri {
+  font-size: 10pt;
+  font-style: italic;
+  color: gray;
+}
+
 pre {
   font-size: 0.8em;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5623587..71bdbc9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2110,7 +2110,7 @@ object SparkContext extends Logging {
 
         val scheduler = new TaskSchedulerImpl(sc)
         val localCluster = new LocalSparkCluster(
-          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
+          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
         val masterUrls = localCluster.start()
         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
         scheduler.initialize(backend)

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index e5873ce..415bd50 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -29,8 +29,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
  * Command-line parser for the driver client.
  */
 private[spark] class ClientArguments(args: Array[String]) {
-  val defaultCores = 1
-  val defaultMemory = 512
+  import ClientArguments._
 
   var cmd: String = "" // 'launch' or 'kill'
   var logLevel = Level.WARN
@@ -39,9 +38,9 @@ private[spark] class ClientArguments(args: Array[String]) {
   var master: String = ""
   var jarUrl: String = ""
   var mainClass: String = ""
-  var supervise: Boolean = false
-  var memory: Int = defaultMemory
-  var cores: Int = defaultCores
+  var supervise: Boolean = DEFAULT_SUPERVISE
+  var memory: Int = DEFAULT_MEMORY
+  var cores: Int = DEFAULT_CORES
   private var _driverOptions = ListBuffer[String]()
   def driverOptions = _driverOptions.toSeq
 
@@ -50,7 +49,7 @@ private[spark] class ClientArguments(args: Array[String]) {
 
   parse(args.toList)
 
-  def parse(args: List[String]): Unit = args match {
+  private def parse(args: List[String]): Unit = args match {
     case ("--cores" | "-c") :: IntParam(value) :: tail =>
       cores = value
       parse(tail)
@@ -106,9 +105,10 @@ private[spark] class ClientArguments(args: Array[String]) {
       |Usage: DriverClient kill <active-master> <driver-id>
       |
       |Options:
-      |   -c CORES, --cores CORES        Number of cores to request (default: $defaultCores)
-      |   -m MEMORY, --memory MEMORY     Megabytes of memory to request (default: $defaultMemory)
+      |   -c CORES, --cores CORES        Number of cores to request (default: $DEFAULT_CORES)
+      |   -m MEMORY, --memory MEMORY     Megabytes of memory to request (default: $DEFAULT_MEMORY)
       |   -s, --supervise                Whether to restart the driver on failure
+      |                                  (default: $DEFAULT_SUPERVISE)
       |   -v, --verbose                  Print more debugging output
      """.stripMargin
     System.err.println(usage)
@@ -117,6 +117,10 @@ private[spark] class ClientArguments(args: Array[String]) {
 }
 
 object ClientArguments {
+  private[spark] val DEFAULT_CORES = 1
+  private[spark] val DEFAULT_MEMORY = 512 // MB
+  private[spark] val DEFAULT_SUPERVISE = false
+
   def isValidJarUrl(s: String): Boolean = {
     try {
       val uri = new URI(s)

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 243d8ed..7f600d8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -148,15 +148,22 @@ private[deploy] object DeployMessages {
 
   // Master to MasterWebUI
 
-  case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
-    activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
-    activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
-    status: MasterState) {
+  case class MasterStateResponse(
+      host: String,
+      port: Int,
+      restPort: Option[Int],
+      workers: Array[WorkerInfo],
+      activeApps: Array[ApplicationInfo],
+      completedApps: Array[ApplicationInfo],
+      activeDrivers: Array[DriverInfo],
+      completedDrivers: Array[DriverInfo],
+      status: MasterState) {
 
     Utils.checkHost(host, "Required hostname")
     assert (port > 0)
 
     def uri = "spark://" + host + ":" + port
+    def restUri: Option[String] = restPort.map { p => "spark://" + host + ":" + p }
   }
 
   //  WorkerWebUI to Worker

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 9a7a113..0401b15 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -33,7 +33,11 @@ import org.apache.spark.util.Utils
  * fault recovery without spinning up a lot of processes.
  */
 private[spark]
-class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int)
+class LocalSparkCluster(
+    numWorkers: Int,
+    coresPerWorker: Int,
+    memoryPerWorker: Int,
+    conf: SparkConf)
   extends Logging {
 
   private val localHostname = Utils.localHostName()
@@ -43,9 +47,11 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
   def start(): Array[String] = {
     logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
 
+    // Disable REST server on Master in this mode unless otherwise specified
+    val _conf = conf.clone().setIfMissing("spark.master.rest.enabled", "false")
+
     /* Start the Master */
-    val conf = new SparkConf(false)
-    val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
+    val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
     masterActorSystems += masterSystem
     val masterUrl = "spark://" + localHostname + ":" + masterPort
     val masters = Array(masterUrl)

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 9d25e64..6d21392 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -18,25 +18,35 @@
 package org.apache.spark.deploy
 
 import java.io.{File, PrintStream}
-import java.lang.reflect.{Modifier, InvocationTargetException}
+import java.lang.reflect.{InvocationTargetException, Modifier}
 import java.net.URL
+
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 
 import org.apache.hadoop.fs.Path
 import org.apache.ivy.Ivy
 import org.apache.ivy.core.LogOptions
-import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
-import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
+import org.apache.ivy.core.module.descriptor._
+import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
 import org.apache.ivy.core.report.ResolveReport
-import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions}
+import org.apache.ivy.core.resolve.ResolveOptions
 import org.apache.ivy.core.retrieve.RetrieveOptions
 import org.apache.ivy.core.settings.IvySettings
 import org.apache.ivy.plugins.matcher.GlobPatternMatcher
 import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
-import org.apache.spark.executor.ExecutorURLClassLoader
+
+import org.apache.spark.deploy.rest._
+import org.apache.spark.executor._
 import org.apache.spark.util.Utils
-import org.apache.spark.executor.ChildExecutorURLClassLoader
-import org.apache.spark.executor.MutableURLClassLoader
+
+/**
+ * Whether to submit, kill, or request the status of an application.
+ * The latter two operations are currently supported only for standalone cluster mode.
+ */
+private[spark] object SparkSubmitAction extends Enumeration {
+  type SparkSubmitAction = Value
+  val SUBMIT, KILL, REQUEST_STATUS = Value
+}
 
 /**
  * Main gateway of launching a Spark application.
@@ -83,21 +93,74 @@ object SparkSubmit {
     if (appArgs.verbose) {
       printStream.println(appArgs)
     }
-    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
-    launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
+    appArgs.action match {
+      case SparkSubmitAction.SUBMIT => submit(appArgs)
+      case SparkSubmitAction.KILL => kill(appArgs)
+      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
+    }
+  }
+
+  /** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
+  private def kill(args: SparkSubmitArguments): Unit = {
+    new StandaloneRestClient()
+      .killSubmission(args.master, args.submissionToKill)
   }
 
   /**
-   * @return a tuple containing
-   *           (1) the arguments for the child process,
-   *           (2) a list of classpath entries for the child,
-   *           (3) a list of system properties and env vars, and
-   *           (4) the main class for the child
+   * Request the status of an existing submission using the REST protocol.
+   * Standalone cluster mode only.
    */
-  private[spark] def createLaunchEnv(args: SparkSubmitArguments)
-      : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
+  private def requestStatus(args: SparkSubmitArguments): Unit = {
+    new StandaloneRestClient()
+      .requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
+  }
 
-    // Values to return
+  /**
+   * Submit the application using the provided parameters.
+   *
+   * This runs in two steps. First, we prepare the launch environment by setting up
+   * the appropriate classpath, system properties, and application arguments for
+   * running the child main class based on the cluster manager and the deploy mode.
+   * Second, we use this launch environment to invoke the main method of the child
+   * main class.
+   */
+  private[spark] def submit(args: SparkSubmitArguments): Unit = {
+    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
+     // In standalone cluster mode, there are two submission gateways:
+     //   (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
+     //   (2) The new REST-based gateway introduced in Spark 1.3
+     // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
+     // to use the legacy gateway if the master endpoint turns out to be not a REST server.
+    if (args.isStandaloneCluster && args.useRest) {
+      try {
+        printStream.println("Running Spark using the REST application submission protocol.")
+        runMain(childArgs, childClasspath, sysProps, childMainClass)
+      } catch {
+        // Fail over to use the legacy submission gateway
+        case e: SubmitRestConnectionException =>
+          printWarning(s"Master endpoint ${args.master} was not a REST server. " +
+            "Falling back to legacy submission gateway instead.")
+          args.useRest = false
+          submit(args)
+      }
+    // In all other modes, just run the main class as prepared
+    } else {
+      runMain(childArgs, childClasspath, sysProps, childMainClass)
+    }
+  }
+
+  /**
+   * Prepare the environment for submitting an application.
+   * This returns a 4-tuple:
+   *   (1) the arguments for the child process,
+   *   (2) a list of classpath entries for the child,
+   *   (3) a map of system properties, and
+   *   (4) the main class for the child
+   * Exposed for testing.
+   */
+  private[spark] def prepareSubmitEnvironment(args: SparkSubmitArguments)
+      : (Seq[String], Seq[String], Map[String, String], String) = {
+    // Return values
     val childArgs = new ArrayBuffer[String]()
     val childClasspath = new ArrayBuffer[String]()
     val sysProps = new HashMap[String, String]()
@@ -235,10 +298,13 @@ object SparkSubmit {
         sysProp = "spark.driver.extraLibraryPath"),
 
       // Standalone cluster only
+      // Do not set CL arguments here because there are multiple possibilities for the main class
       OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
       OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
-      OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
-      OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
+      OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
+      OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
+      OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
+        sysProp = "spark.driver.supervise"),
 
       // Yarn client only
       OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
@@ -279,7 +345,6 @@ object SparkSubmit {
       if (args.childArgs != null) { childArgs ++= args.childArgs }
     }
 
-
     // Map all arguments to command-line options or system properties for our chosen mode
     for (opt <- options) {
       if (opt.value != null &&
@@ -301,14 +366,21 @@ object SparkSubmit {
       sysProps.put("spark.jars", jars.mkString(","))
     }
 
-    // In standalone-cluster mode, use Client as a wrapper around the user class
-    if (clusterManager == STANDALONE && deployMode == CLUSTER) {
-      childMainClass = "org.apache.spark.deploy.Client"
-      if (args.supervise) {
-        childArgs += "--supervise"
+    // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
+    // All Spark parameters are expected to be passed to the client through system properties.
+    if (args.isStandaloneCluster) {
+      if (args.useRest) {
+        childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
+        childArgs += (args.primaryResource, args.mainClass)
+      } else {
+        // In legacy standalone cluster mode, use Client as a wrapper around the user class
+        childMainClass = "org.apache.spark.deploy.Client"
+        if (args.supervise) { childArgs += "--supervise" }
+        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
+        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
+        childArgs += "launch"
+        childArgs += (args.master, args.primaryResource, args.mainClass)
       }
-      childArgs += "launch"
-      childArgs += (args.master, args.primaryResource, args.mainClass)
       if (args.childArgs != null) {
         childArgs ++= args.childArgs
       }
@@ -345,7 +417,7 @@ object SparkSubmit {
 
     // Ignore invalid spark.driver.host in cluster modes.
     if (deployMode == CLUSTER) {
-      sysProps -= ("spark.driver.host")
+      sysProps -= "spark.driver.host"
     }
 
     // Resolve paths in certain spark properties
@@ -374,9 +446,15 @@ object SparkSubmit {
     (childArgs, childClasspath, sysProps, childMainClass)
   }
 
-  private def launch(
-      childArgs: ArrayBuffer[String],
-      childClasspath: ArrayBuffer[String],
+  /**
+   * Run the main method of the child class using the provided launch environment.
+   *
+   * Note that this main class will not be the one provided by the user if we're
+   * running cluster deploy mode or python applications.
+   */
+  private def runMain(
+      childArgs: Seq[String],
+      childClasspath: Seq[String],
       sysProps: Map[String, String],
       childMainClass: String,
       verbose: Boolean = false) {
@@ -697,7 +775,7 @@ private[spark] object SparkSubmitUtils {
  * Provides an indirection layer for passing arguments as system properties or flags to
  * the user's driver program or to downstream launcher tools.
  */
-private[spark] case class OptionAssigner(
+private case class OptionAssigner(
     value: String,
     clusterManager: Int,
     deployMode: Int,

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 5cadc53..bd0ae26 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -22,6 +22,7 @@ import java.util.jar.JarFile
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
+import org.apache.spark.deploy.SparkSubmitAction._
 import org.apache.spark.util.Utils
 
 /**
@@ -39,8 +40,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
   var driverExtraClassPath: String = null
   var driverExtraLibraryPath: String = null
   var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
   var queue: String = null
   var numExecutors: String = null
   var files: String = null
@@ -56,8 +55,16 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
   var verbose: Boolean = false
   var isPython: Boolean = false
   var pyFiles: String = null
+  var action: SparkSubmitAction = null
   val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
 
+  // Standalone cluster mode only
+  var supervise: Boolean = false
+  var driverCores: String = null
+  var submissionToKill: String = null
+  var submissionToRequestStatusFor: String = null
+  var useRest: Boolean = true // used internally
+
   /** Default properties present in the currently defined defaults file. */
   lazy val defaultSparkProperties: HashMap[String, String] = {
     val defaultProperties = new HashMap[String, String]()
@@ -82,7 +89,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
   // Use `sparkProperties` map along with env vars to fill in any missing parameters
   loadEnvironmentArguments()
 
-  checkRequiredArguments()
+  validateArguments()
 
   /**
    * Merge values from the default properties file with those specified through --conf.
@@ -107,6 +114,15 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
       .orElse(sparkProperties.get("spark.master"))
       .orElse(env.get("MASTER"))
       .orNull
+    driverExtraClassPath = Option(driverExtraClassPath)
+      .orElse(sparkProperties.get("spark.driver.extraClassPath"))
+      .orNull
+    driverExtraJavaOptions = Option(driverExtraJavaOptions)
+      .orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
+      .orNull
+    driverExtraLibraryPath = Option(driverExtraLibraryPath)
+      .orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
+      .orNull
     driverMemory = Option(driverMemory)
       .orElse(sparkProperties.get("spark.driver.memory"))
       .orElse(env.get("SPARK_DRIVER_MEMORY"))
@@ -166,10 +182,21 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
     if (name == null && primaryResource != null) {
       name = Utils.stripDirectory(primaryResource)
     }
+
+    // Action should be SUBMIT unless otherwise specified
+    action = Option(action).getOrElse(SUBMIT)
   }
 
   /** Ensure that required fields exists. Call this only once all defaults are loaded. */
-  private def checkRequiredArguments(): Unit = {
+  private def validateArguments(): Unit = {
+    action match {
+      case SUBMIT => validateSubmitArguments()
+      case KILL => validateKillArguments()
+      case REQUEST_STATUS => validateStatusRequestArguments()
+    }
+  }
+
+  private def validateSubmitArguments(): Unit = {
     if (args.length == 0) {
       printUsageAndExit(-1)
     }
@@ -192,6 +219,29 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
     }
   }
 
+  private def validateKillArguments(): Unit = {
+    if (!master.startsWith("spark://")) {
+      SparkSubmit.printErrorAndExit("Killing submissions is only supported in standalone mode!")
+    }
+    if (submissionToKill == null) {
+      SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
+    }
+  }
+
+  private def validateStatusRequestArguments(): Unit = {
+    if (!master.startsWith("spark://")) {
+      SparkSubmit.printErrorAndExit(
+        "Requesting submission statuses is only supported in standalone mode!")
+    }
+    if (submissionToRequestStatusFor == null) {
+      SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
+    }
+  }
+
+  def isStandaloneCluster: Boolean = {
+    master.startsWith("spark://") && deployMode == "cluster"
+  }
+
   override def toString = {
     s"""Parsed arguments:
     |  master                  $master
@@ -300,6 +350,22 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
         propertiesFile = value
         parse(tail)
 
+      case ("--kill") :: value :: tail =>
+        submissionToKill = value
+        if (action != null) {
+          SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
+        }
+        action = KILL
+        parse(tail)
+
+      case ("--status") :: value :: tail =>
+        submissionToRequestStatusFor = value
+        if (action != null) {
+          SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
+        }
+        action = REQUEST_STATUS
+        parse(tail)
+
       case ("--supervise") :: tail =>
         supervise = true
         parse(tail)
@@ -372,7 +438,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
       outStream.println("Unknown/unsupported param " + unknownParam)
     }
     outStream.println(
-      """Usage: spark-submit [options] <app jar | python file> [app options]
+      """Usage: spark-submit [options] <app jar | python file> [app arguments]
+        |Usage: spark-submit --kill [submission ID] --master [spark://...]
+        |Usage: spark-submit --status [submission ID] --master [spark://...]
+        |
         |Options:
         |  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
         |  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
@@ -413,6 +482,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
         | Spark standalone with cluster deploy mode only:
         |  --driver-cores NUM          Cores for driver (Default: 1).
         |  --supervise                 If given, restarts the driver on failure.
+        |  --kill SUBMISSION_ID        If given, kills the driver specified.
+        |  --status SUBMISSION_ID      If given, requests the status of the driver specified.
         |
         | Spark standalone and Mesos only:
         |  --total-executor-cores NUM  Total cores for all executors.

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 5eeb9fe..b8b1a25 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -43,6 +43,7 @@ import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.deploy.master.MasterMessages._
 import org.apache.spark.deploy.master.ui.MasterWebUI
+import org.apache.spark.deploy.rest.StandaloneRestServer
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
 import org.apache.spark.ui.SparkUI
@@ -52,12 +53,12 @@ private[spark] class Master(
     host: String,
     port: Int,
     webUiPort: Int,
-    val securityMgr: SecurityManager)
+    val securityMgr: SecurityManager,
+    val conf: SparkConf)
   extends Actor with ActorLogReceive with Logging with LeaderElectable {
 
   import context.dispatcher   // to use Akka's scheduler.schedule()
 
-  val conf = new SparkConf
   val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
 
   def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
@@ -121,6 +122,17 @@ private[spark] class Master(
     throw new SparkException("spark.deploy.defaultCores must be positive")
   }
 
+  // Alternative application submission gateway that is stable across Spark versions
+  private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
+  private val restServer =
+    if (restServerEnabled) {
+      val port = conf.getInt("spark.master.rest.port", 6066)
+      Some(new StandaloneRestServer(host, port, self, masterUrl, conf))
+    } else {
+      None
+    }
+  private val restServerBoundPort = restServer.map(_.start())
+
   override def preStart() {
     logInfo("Starting Spark master at " + masterUrl)
     logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
@@ -174,6 +186,7 @@ private[spark] class Master(
       recoveryCompletionTask.cancel()
     }
     webUi.stop()
+    restServer.foreach(_.stop())
     masterMetricsSystem.stop()
     applicationMetricsSystem.stop()
     persistenceEngine.close()
@@ -421,7 +434,9 @@ private[spark] class Master(
     }
 
     case RequestMasterState => {
-      sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
+      sender ! MasterStateResponse(
+        host, port, restServerBoundPort,
+        workers.toArray, apps.toArray, completedApps.toArray,
         drivers.toArray, completedDrivers.toArray, state)
     }
 
@@ -429,8 +444,8 @@ private[spark] class Master(
       timeOutDeadWorkers()
     }
 
-    case RequestWebUIPort => {
-      sender ! WebUIPortResponse(webUi.boundPort)
+    case BoundPortsRequest => {
+      sender ! BoundPortsResponse(port, webUi.boundPort, restServerBoundPort)
     }
   }
 
@@ -851,7 +866,7 @@ private[spark] object Master extends Logging {
     SignalLogger.register(log)
     val conf = new SparkConf
     val args = new MasterArguments(argStrings, conf)
-    val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
+    val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
     actorSystem.awaitTermination()
   }
 
@@ -875,19 +890,26 @@ private[spark] object Master extends Logging {
     Address(protocol, systemName, host, port)
   }
 
+  /**
+   * Start the Master and return a four tuple of:
+   *   (1) The Master actor system
+   *   (2) The bound port
+   *   (3) The web UI bound port
+   *   (4) The REST server bound port, if any
+   */
   def startSystemAndActor(
       host: String,
       port: Int,
       webUiPort: Int,
-      conf: SparkConf): (ActorSystem, Int, Int) = {
+      conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
     val securityMgr = new SecurityManager(conf)
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
       securityManager = securityMgr)
-    val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort,
-      securityMgr), actorName)
+    val actor = actorSystem.actorOf(
+      Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
     val timeout = AkkaUtils.askTimeout(conf)
-    val respFuture = actor.ask(RequestWebUIPort)(timeout)
-    val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
-    (actorSystem, boundPort, resp.webUIBoundPort)
+    val portsRequest = actor.ask(BoundPortsRequest)(timeout)
+    val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
+    (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index db72d8a..15c6296 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -36,7 +36,7 @@ private[master] object MasterMessages {
 
   case object CompleteRecovery
 
-  case object RequestWebUIPort
+  case object BoundPortsRequest
 
-  case class WebUIPortResponse(webUIBoundPort: Int)
+  case class BoundPortsResponse(actorPort: Int, webUIPort: Int, restPort: Option[Int])
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 7ca3b08..b47a081 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -46,19 +46,19 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
     val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
     val state = Await.result(stateFuture, timeout)
 
-    val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
+    val workerHeaders = Seq("Worker Id", "Address", "State", "Cores", "Memory")
     val workers = state.workers.sortBy(_.id)
     val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
 
-    val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User",
-      "State", "Duration")
+    val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
+      "User", "State", "Duration")
     val activeApps = state.activeApps.sortBy(_.startTime).reverse
     val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
     val completedApps = state.completedApps.sortBy(_.endTime).reverse
     val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
 
-    val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory",
-      "Main Class")
+    val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
+      "Memory", "Main Class")
     val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
     val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
     val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
@@ -73,6 +73,14 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
           <div class="span12">
             <ul class="unstyled">
               <li><strong>URL:</strong> {state.uri}</li>
+              {
+                state.restUri.map { uri =>
+                  <li>
+                    <strong>REST URL:</strong> {uri}
+                    <span class="rest-uri"> (cluster mode)</span>
+                  </li>
+                }.getOrElse { Seq.empty }
+              }
               <li><strong>Workers:</strong> {state.workers.size}</li>
               <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
                 {state.workers.map(_.coresUsed).sum} Used</li>

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
new file mode 100644
index 0000000..115aa52
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.io.{DataOutputStream, FileNotFoundException}
+import java.net.{HttpURLConnection, SocketException, URL}
+
+import scala.io.Source
+
+import com.fasterxml.jackson.databind.JsonMappingException
+import com.google.common.base.Charsets
+
+import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
+
+/**
+ * A client that submits applications to the standalone Master using a REST protocol.
+ * This client is intended to communicate with the [[StandaloneRestServer]] and is
+ * currently used for cluster mode only.
+ *
+ * In protocol version v1, the REST URL takes the form http://[host:port]/v1/submissions/[action],
+ * where [action] can be one of create, kill, or status. Each type of request is represented in
+ * an HTTP message sent to the following prefixes:
+ *   (1) submit - POST to /submissions/create
+ *   (2) kill - POST /submissions/kill/[submissionId]
+ *   (3) status - GET /submissions/status/[submissionId]
+ *
+ * In the case of (1), parameters are posted in the HTTP body in the form of JSON fields.
+ * Otherwise, the URL fully specifies the intended action of the client.
+ *
+ * Since the protocol is expected to be stable across Spark versions, existing fields cannot be
+ * added or removed, though new optional fields can be added. In the rare event that forward or
+ * backward compatibility is broken, Spark must introduce a new protocol version (e.g. v2).
+ *
+ * The client and the server must communicate using the same version of the protocol. If there
+ * is a mismatch, the server will respond with the highest protocol version it supports. A future
+ * implementation of this client can use that information to retry using the version specified
+ * by the server.
+ */
+private[spark] class StandaloneRestClient extends Logging {
+  import StandaloneRestClient._
+
+  /**
+   * Submit an application specified by the parameters in the provided request.
+   *
+   * If the submission was successful, poll the status of the submission and report
+   * it to the user. Otherwise, report the error message provided by the server.
+   */
+  def createSubmission(
+      master: String,
+      request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
+    logInfo(s"Submitting a request to launch an application in $master.")
+    validateMaster(master)
+    val url = getSubmitUrl(master)
+    val response = postJson(url, request.toJson)
+    response match {
+      case s: CreateSubmissionResponse =>
+        reportSubmissionStatus(master, s)
+        handleRestResponse(s)
+      case unexpected =>
+        handleUnexpectedRestResponse(unexpected)
+    }
+    response
+  }
+
+  /** Request that the server kill the specified submission. */
+  def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = {
+    logInfo(s"Submitting a request to kill submission $submissionId in $master.")
+    validateMaster(master)
+    val response = post(getKillUrl(master, submissionId))
+    response match {
+      case k: KillSubmissionResponse => handleRestResponse(k)
+      case unexpected => handleUnexpectedRestResponse(unexpected)
+    }
+    response
+  }
+
+  /** Request the status of a submission from the server. */
+  def requestSubmissionStatus(
+      master: String,
+      submissionId: String,
+      quiet: Boolean = false): SubmitRestProtocolResponse = {
+    logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
+    validateMaster(master)
+    val response = get(getStatusUrl(master, submissionId))
+    response match {
+      case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) }
+      case unexpected => handleUnexpectedRestResponse(unexpected)
+    }
+    response
+  }
+
+  /** Construct a message that captures the specified parameters for submitting an application. */
+  def constructSubmitRequest(
+      appResource: String,
+      mainClass: String,
+      appArgs: Array[String],
+      sparkProperties: Map[String, String],
+      environmentVariables: Map[String, String]): CreateSubmissionRequest = {
+    val message = new CreateSubmissionRequest
+    message.clientSparkVersion = sparkVersion
+    message.appResource = appResource
+    message.mainClass = mainClass
+    message.appArgs = appArgs
+    message.sparkProperties = sparkProperties
+    message.environmentVariables = environmentVariables
+    message.validate()
+    message
+  }
+
+  /** Send a GET request to the specified URL. */
+  private def get(url: URL): SubmitRestProtocolResponse = {
+    logDebug(s"Sending GET request to server at $url.")
+    val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+    conn.setRequestMethod("GET")
+    readResponse(conn)
+  }
+
+  /** Send a POST request to the specified URL. */
+  private def post(url: URL): SubmitRestProtocolResponse = {
+    logDebug(s"Sending POST request to server at $url.")
+    val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+    conn.setRequestMethod("POST")
+    readResponse(conn)
+  }
+
+  /** Send a POST request with the given JSON as the body to the specified URL. */
+  private def postJson(url: URL, json: String): SubmitRestProtocolResponse = {
+    logDebug(s"Sending POST request to server at $url:\n$json")
+    val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+    conn.setRequestMethod("POST")
+    conn.setRequestProperty("Content-Type", "application/json")
+    conn.setRequestProperty("charset", "utf-8")
+    conn.setDoOutput(true)
+    val out = new DataOutputStream(conn.getOutputStream)
+    out.write(json.getBytes(Charsets.UTF_8))
+    out.close()
+    readResponse(conn)
+  }
+
+  /**
+   * Read the response from the server and return it as a validated [[SubmitRestProtocolResponse]].
+   * If the response represents an error, report the embedded message to the user.
+   */
+  private def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
+    try {
+      val responseJson = Source.fromInputStream(connection.getInputStream).mkString
+      logDebug(s"Response from the server:\n$responseJson")
+      val response = SubmitRestProtocolMessage.fromJson(responseJson)
+      response.validate()
+      response match {
+        // If the response is an error, log the message
+        case error: ErrorResponse =>
+          logError(s"Server responded with error:\n${error.message}")
+          error
+        // Otherwise, simply return the response
+        case response: SubmitRestProtocolResponse => response
+        case unexpected =>
+          throw new SubmitRestProtocolException(
+            s"Message received from server was not a response:\n${unexpected.toJson}")
+      }
+    } catch {
+      case unreachable @ (_: FileNotFoundException | _: SocketException) =>
+        throw new SubmitRestConnectionException(
+          s"Unable to connect to server ${connection.getURL}", unreachable)
+      case malformed @ (_: SubmitRestProtocolException | _: JsonMappingException) =>
+        throw new SubmitRestProtocolException(
+          "Malformed response received from server", malformed)
+    }
+  }
+
+  /** Return the REST URL for creating a new submission. */
+  private def getSubmitUrl(master: String): URL = {
+    val baseUrl = getBaseUrl(master)
+    new URL(s"$baseUrl/create")
+  }
+
+  /** Return the REST URL for killing an existing submission. */
+  private def getKillUrl(master: String, submissionId: String): URL = {
+    val baseUrl = getBaseUrl(master)
+    new URL(s"$baseUrl/kill/$submissionId")
+  }
+
+  /** Return the REST URL for requesting the status of an existing submission. */
+  private def getStatusUrl(master: String, submissionId: String): URL = {
+    val baseUrl = getBaseUrl(master)
+    new URL(s"$baseUrl/status/$submissionId")
+  }
+
+  /** Return the base URL for communicating with the server, including the protocol version. */
+  private def getBaseUrl(master: String): String = {
+    val masterUrl = master.stripPrefix("spark://").stripSuffix("/")
+    s"http://$masterUrl/$PROTOCOL_VERSION/submissions"
+  }
+
+  /** Throw an exception if this is not standalone mode. */
+  private def validateMaster(master: String): Unit = {
+    if (!master.startsWith("spark://")) {
+      throw new IllegalArgumentException("This REST client is only supported in standalone mode.")
+    }
+  }
+
+  /** Report the status of a newly created submission. */
+  private def reportSubmissionStatus(
+      master: String,
+      submitResponse: CreateSubmissionResponse): Unit = {
+    if (submitResponse.success) {
+      val submissionId = submitResponse.submissionId
+      if (submissionId != null) {
+        logInfo(s"Submission successfully created as $submissionId. Polling submission state...")
+        pollSubmissionStatus(master, submissionId)
+      } else {
+        // should never happen
+        logError("Application successfully submitted, but submission ID was not provided!")
+      }
+    } else {
+      val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
+      logError("Application submission failed" + failMessage)
+    }
+  }
+
+  /**
+   * Poll the status of the specified submission and log it.
+   * This retries up to a fixed number of times before giving up.
+   */
+  private def pollSubmissionStatus(master: String, submissionId: String): Unit = {
+    (1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
+      val response = requestSubmissionStatus(master, submissionId, quiet = true)
+      val statusResponse = response match {
+        case s: SubmissionStatusResponse => s
+        case _ => return // unexpected type, let upstream caller handle it
+      }
+      if (statusResponse.success) {
+        val driverState = Option(statusResponse.driverState)
+        val workerId = Option(statusResponse.workerId)
+        val workerHostPort = Option(statusResponse.workerHostPort)
+        val exception = Option(statusResponse.message)
+        // Log driver state, if present
+        driverState match {
+          case Some(state) => logInfo(s"State of driver $submissionId is now $state.")
+          case _ => logError(s"State of driver $submissionId was not found!")
+        }
+        // Log worker node, if present
+        (workerId, workerHostPort) match {
+          case (Some(id), Some(hp)) => logInfo(s"Driver is running on worker $id at $hp.")
+          case _ =>
+        }
+        // Log exception stack trace, if present
+        exception.foreach { e => logError(e) }
+        return
+      }
+      Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)
+    }
+    logError(s"Error: Master did not recognize driver $submissionId.")
+  }
+
+  /** Log the response sent by the server in the REST application submission protocol. */
+  private def handleRestResponse(response: SubmitRestProtocolResponse): Unit = {
+    logInfo(s"Server responded with ${response.messageType}:\n${response.toJson}")
+  }
+
+  /** Log an appropriate error if the response sent by the server is not of the expected type. */
+  private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
+    logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
+  }
+}
+
+private[spark] object StandaloneRestClient {
+  val REPORT_DRIVER_STATUS_INTERVAL = 1000
+  val REPORT_DRIVER_STATUS_MAX_TRIES = 10
+  val PROTOCOL_VERSION = "v1"
+
+  /** Submit an application, assuming Spark parameters are specified through system properties. */
+  def main(args: Array[String]): Unit = {
+    if (args.size < 2) {
+      sys.error("Usage: StandaloneRestClient [app resource] [main class] [app args*]")
+      sys.exit(1)
+    }
+    val appResource = args(0)
+    val mainClass = args(1)
+    val appArgs = args.slice(2, args.size)
+    val conf = new SparkConf
+    val master = conf.getOption("spark.master").getOrElse {
+      throw new IllegalArgumentException("'spark.master' must be set.")
+    }
+    val sparkProperties = conf.getAll.toMap
+    val environmentVariables = sys.env.filter { case (k, _) => k.startsWith("SPARK_") }
+    val client = new StandaloneRestClient
+    val submitRequest = client.constructSubmitRequest(
+      appResource, mainClass, appArgs, sparkProperties, environmentVariables)
+    client.createSubmission(master, submitRequest)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
new file mode 100644
index 0000000..2033d67
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.io.{DataOutputStream, File}
+import java.net.InetSocketAddress
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
+
+import scala.io.Source
+
+import akka.actor.ActorRef
+import com.fasterxml.jackson.databind.JsonMappingException
+import com.google.common.base.Charsets
+import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
+import org.eclipse.jetty.util.thread.QueuedThreadPool
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
+import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
+import org.apache.spark.deploy.ClientArguments._
+
+/**
+ * A server that responds to requests submitted by the [[StandaloneRestClient]].
+ * This is intended to be embedded in the standalone Master and used in cluster mode only.
+ *
+ * This server responds with different HTTP codes depending on the situation:
+ *   200 OK - Request was processed successfully
+ *   400 BAD REQUEST - Request was malformed, not successfully validated, or of unexpected type
+ *   468 UNKNOWN PROTOCOL VERSION - Request specified a protocol this server does not understand
+ *   500 INTERNAL SERVER ERROR - Server throws an exception internally while processing the request
+ *
+ * The server always includes a JSON representation of the relevant [[SubmitRestProtocolResponse]]
+ * in the HTTP body. If an error occurs, however, the server will include an [[ErrorResponse]]
+ * instead of the one expected by the client. If the construction of this error response itself
+ * fails, the response will consist of an empty body with a response code that indicates internal
+ * server error.
+ *
+ * @param host the address this server should bind to
+ * @param requestedPort the port this server will attempt to bind to
+ * @param masterActor reference to the Master actor to which requests can be sent
+ * @param masterUrl the URL of the Master new drivers will attempt to connect to
+ * @param masterConf the conf used by the Master
+ */
+private[spark] class StandaloneRestServer(
+    host: String,
+    requestedPort: Int,
+    masterActor: ActorRef,
+    masterUrl: String,
+    masterConf: SparkConf)
+  extends Logging {
+
+  import StandaloneRestServer._
+
+  private var _server: Option[Server] = None
+  private val baseContext = s"/$PROTOCOL_VERSION/submissions"
+
+  // A mapping from servlets to the URL prefixes they are responsible for
+  private val servletToContext = Map[StandaloneRestServlet, String](
+    new SubmitRequestServlet(masterActor, masterUrl, masterConf) -> s"$baseContext/create/*",
+    new KillRequestServlet(masterActor, masterConf) -> s"$baseContext/kill/*",
+    new StatusRequestServlet(masterActor, masterConf) -> s"$baseContext/status/*",
+    new ErrorServlet -> "/" // default handler
+  )
+
+  /** Start the server and return the bound port. */
+  def start(): Int = {
+    val (server, boundPort) = Utils.startServiceOnPort[Server](requestedPort, doStart, masterConf)
+    _server = Some(server)
+    logInfo(s"Started REST server for submitting applications on port $boundPort")
+    boundPort
+  }
+
+  /**
+   * Map the servlets to their corresponding contexts and attach them to a server.
+   * Return a 2-tuple of the started server and the bound port.
+   */
+  private def doStart(startPort: Int): (Server, Int) = {
+    val server = new Server(new InetSocketAddress(host, startPort))
+    val threadPool = new QueuedThreadPool
+    threadPool.setDaemon(true)
+    server.setThreadPool(threadPool)
+    val mainHandler = new ServletContextHandler
+    mainHandler.setContextPath("/")
+    servletToContext.foreach { case (servlet, prefix) =>
+      mainHandler.addServlet(new ServletHolder(servlet), prefix)
+    }
+    server.setHandler(mainHandler)
+    server.start()
+    val boundPort = server.getConnectors()(0).getLocalPort
+    (server, boundPort)
+  }
+
+  def stop(): Unit = {
+    _server.foreach(_.stop())
+  }
+}
+
+private object StandaloneRestServer {
+  val PROTOCOL_VERSION = StandaloneRestClient.PROTOCOL_VERSION
+  val SC_UNKNOWN_PROTOCOL_VERSION = 468
+}
+
+/**
+ * An abstract servlet for handling requests passed to the [[StandaloneRestServer]].
+ */
+private abstract class StandaloneRestServlet extends HttpServlet with Logging {
+
+  /** Service a request. If an exception is thrown in the process, indicate server error. */
+  protected override def service(
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    try {
+      super.service(request, response)
+    } catch {
+      case e: Exception =>
+        logError("Exception while handling request", e)
+        response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
+    }
+  }
+
+  /**
+   * Serialize the given response message to JSON and send it through the response servlet.
+   * This validates the response before sending it to ensure it is properly constructed.
+   */
+  protected def sendResponse(
+      responseMessage: SubmitRestProtocolResponse,
+      responseServlet: HttpServletResponse): Unit = {
+    val message = validateResponse(responseMessage, responseServlet)
+    responseServlet.setContentType("application/json")
+    responseServlet.setCharacterEncoding("utf-8")
+    responseServlet.setStatus(HttpServletResponse.SC_OK)
+    val content = message.toJson.getBytes(Charsets.UTF_8)
+    val out = new DataOutputStream(responseServlet.getOutputStream)
+    out.write(content)
+    out.close()
+  }
+
+  /**
+   * Return any fields in the client request message that the server does not know about.
+   *
+   * The mechanism for this is to reconstruct the JSON on the server side and compare the
+   * diff between this JSON and the one generated on the client side. Any fields that are
+   * only in the client JSON are treated as unexpected.
+   */
+  protected def findUnknownFields(
+      requestJson: String,
+      requestMessage: SubmitRestProtocolMessage): Array[String] = {
+    val clientSideJson = parse(requestJson)
+    val serverSideJson = parse(requestMessage.toJson)
+    val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
+    unknown match {
+      case j: JObject => j.obj.map { case (k, _) => k }.toArray
+      case _ => Array.empty[String] // No difference
+    }
+  }
+
+  /** Return a human readable String representation of the exception. */
+  protected def formatException(e: Throwable): String = {
+    val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
+    s"$e\n$stackTraceString"
+  }
+
+  /** Construct an error message to signal the fact that an exception has been thrown. */
+  protected def handleError(message: String): ErrorResponse = {
+    val e = new ErrorResponse
+    e.serverSparkVersion = sparkVersion
+    e.message = message
+    e
+  }
+
+  /**
+   * Validate the response to ensure that it is correctly constructed.
+   *
+   * If it is, simply return the message as is. Otherwise, return an error response instead
+   * to propagate the exception back to the client and set the appropriate error code.
+   */
+  private def validateResponse(
+      responseMessage: SubmitRestProtocolResponse,
+      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+    try {
+      responseMessage.validate()
+      responseMessage
+    } catch {
+      case e: Exception =>
+        responseServlet.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
+        handleError("Internal server error: " + formatException(e))
+    }
+  }
+}
+
+/**
+ * A servlet for handling kill requests passed to the [[StandaloneRestServer]].
+ */
+private class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
+  extends StandaloneRestServlet {
+
+  /**
+   * If a submission ID is specified in the URL, have the Master kill the corresponding
+   * driver and return an appropriate response to the client. Otherwise, return error.
+   */
+  protected override def doPost(
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    val submissionId = request.getPathInfo.stripPrefix("/")
+    val responseMessage =
+      if (submissionId.nonEmpty) {
+        handleKill(submissionId)
+      } else {
+        response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+        handleError("Submission ID is missing in kill request.")
+      }
+    sendResponse(responseMessage, response)
+  }
+
+  private def handleKill(submissionId: String): KillSubmissionResponse = {
+    val askTimeout = AkkaUtils.askTimeout(conf)
+    val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
+      DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
+    val k = new KillSubmissionResponse
+    k.serverSparkVersion = sparkVersion
+    k.message = response.message
+    k.submissionId = submissionId
+    k.success = response.success
+    k
+  }
+}
+
+/**
+ * A servlet for handling status requests passed to the [[StandaloneRestServer]].
+ */
+private class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
+  extends StandaloneRestServlet {
+
+  /**
+   * If a submission ID is specified in the URL, request the status of the corresponding
+   * driver from the Master and include it in the response. Otherwise, return error.
+   */
+  protected override def doGet(
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    val submissionId = request.getPathInfo.stripPrefix("/")
+    val responseMessage =
+      if (submissionId.nonEmpty) {
+        handleStatus(submissionId)
+      } else {
+        response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+        handleError("Submission ID is missing in status request.")
+      }
+    sendResponse(responseMessage, response)
+  }
+
+  private def handleStatus(submissionId: String): SubmissionStatusResponse = {
+    val askTimeout = AkkaUtils.askTimeout(conf)
+    val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
+      DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
+    val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
+    val d = new SubmissionStatusResponse
+    d.serverSparkVersion = sparkVersion
+    d.submissionId = submissionId
+    d.success = response.found
+    d.driverState = response.state.map(_.toString).orNull
+    d.workerId = response.workerId.orNull
+    d.workerHostPort = response.workerHostPort.orNull
+    d.message = message.orNull
+    d
+  }
+}
+
+/**
+ * A servlet for handling submit requests passed to the [[StandaloneRestServer]].
+ */
+private class SubmitRequestServlet(
+    masterActor: ActorRef,
+    masterUrl: String,
+    conf: SparkConf)
+  extends StandaloneRestServlet {
+
+  /**
+   * Submit an application to the Master with parameters specified in the request.
+   *
+   * The request is assumed to be a [[SubmitRestProtocolRequest]] in the form of JSON.
+   * If the request is successfully processed, return an appropriate response to the
+   * client indicating so. Otherwise, return error instead.
+   */
+  protected override def doPost(
+      requestServlet: HttpServletRequest,
+      responseServlet: HttpServletResponse): Unit = {
+    val responseMessage =
+      try {
+        val requestMessageJson = Source.fromInputStream(requestServlet.getInputStream).mkString
+        val requestMessage = SubmitRestProtocolMessage.fromJson(requestMessageJson)
+        // The response should have already been validated on the client.
+        // In case this is not true, validate it ourselves to avoid potential NPEs.
+        requestMessage.validate()
+        handleSubmit(requestMessageJson, requestMessage, responseServlet)
+      } catch {
+        // The client failed to provide a valid JSON, so this is not our fault
+        case e @ (_: JsonMappingException | _: SubmitRestProtocolException) =>
+          responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+          handleError("Malformed request: " + formatException(e))
+      }
+    sendResponse(responseMessage, responseServlet)
+  }
+
+  /**
+   * Handle the submit request and construct an appropriate response to return to the client.
+   *
+   * This assumes that the request message is already successfully validated.
+   * If the request message is not of the expected type, return error to the client.
+   */
+  private def handleSubmit(
+      requestMessageJson: String,
+      requestMessage: SubmitRestProtocolMessage,
+      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+    requestMessage match {
+      case submitRequest: CreateSubmissionRequest =>
+        val askTimeout = AkkaUtils.askTimeout(conf)
+        val driverDescription = buildDriverDescription(submitRequest)
+        val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
+          DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
+        val submitResponse = new CreateSubmissionResponse
+        submitResponse.serverSparkVersion = sparkVersion
+        submitResponse.message = response.message
+        submitResponse.success = response.success
+        submitResponse.submissionId = response.driverId.orNull
+        val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
+        if (unknownFields.nonEmpty) {
+          // If there are fields that the server does not know about, warn the client
+          submitResponse.unknownFields = unknownFields
+        }
+        submitResponse
+      case unexpected =>
+        responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+        handleError(s"Received message of unexpected type ${unexpected.messageType}.")
+    }
+  }
+
+  /**
+   * Build a driver description from the fields specified in the submit request.
+   *
+   * This involves constructing a command that takes into account memory, java options,
+   * classpath and other settings to launch the driver. This does not currently consider
+   * fields used by python applications since python is not supported in standalone
+   * cluster mode yet.
+   */
+  private def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = {
+    // Required fields, including the main class because python is not yet supported
+    val appResource = Option(request.appResource).getOrElse {
+      throw new SubmitRestMissingFieldException("Application jar is missing.")
+    }
+    val mainClass = Option(request.mainClass).getOrElse {
+      throw new SubmitRestMissingFieldException("Main class is missing.")
+    }
+
+    // Optional fields
+    val sparkProperties = request.sparkProperties
+    val driverMemory = sparkProperties.get("spark.driver.memory")
+    val driverCores = sparkProperties.get("spark.driver.cores")
+    val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
+    val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
+    val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
+    val superviseDriver = sparkProperties.get("spark.driver.supervise")
+    val appArgs = request.appArgs
+    val environmentVariables = request.environmentVariables
+
+    // Construct driver description
+    val conf = new SparkConf(false)
+      .setAll(sparkProperties)
+      .set("spark.master", masterUrl)
+    val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
+    val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
+    val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
+    val sparkJavaOpts = Utils.sparkJavaOpts(conf)
+    val javaOpts = sparkJavaOpts ++ extraJavaOpts
+    val command = new Command(
+      "org.apache.spark.deploy.worker.DriverWrapper",
+      Seq("{{WORKER_URL}}", mainClass) ++ appArgs, // args to the DriverWrapper
+      environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
+    val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
+    val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
+    val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
+    new DriverDescription(
+      appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)
+  }
+}
+
+/**
+ * A default servlet that handles error cases that are not captured by other servlets.
+ */
+private class ErrorServlet extends StandaloneRestServlet {
+  private val serverVersion = StandaloneRestServer.PROTOCOL_VERSION
+
+  /** Service a faulty request by returning an appropriate error message to the client. */
+  protected override def service(
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    val path = request.getPathInfo
+    val parts = path.stripPrefix("/").split("/").toSeq
+    var versionMismatch = false
+    var msg =
+      parts match {
+        case Nil =>
+          // http://host:port/
+          "Missing protocol version."
+        case `serverVersion` :: Nil =>
+          // http://host:port/correct-version
+          "Missing the /submissions prefix."
+        case `serverVersion` :: "submissions" :: Nil =>
+          // http://host:port/correct-version/submissions
+          "Missing an action: please specify one of /create, /kill, or /status."
+        case unknownVersion :: _ =>
+          // http://host:port/unknown-version/*
+          versionMismatch = true
+          s"Unknown protocol version '$unknownVersion'."
+        case _ =>
+          // never reached
+          s"Malformed path $path."
+      }
+    msg += s" Please submit requests through http://[host]:[port]/$serverVersion/submissions/..."
+    val error = handleError(msg)
+    // If there is a version mismatch, include the highest protocol version that
+    // this server supports in case the client wants to retry with our version
+    if (versionMismatch) {
+      error.highestProtocolVersion = serverVersion
+      response.setStatus(StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
+    } else {
+      response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+    }
+    sendResponse(error, response)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolException.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolException.scala
new file mode 100644
index 0000000..d7a0bdb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolException.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+/**
+ * An exception thrown in the REST application submission protocol.
+ */
+private[spark] class SubmitRestProtocolException(message: String, cause: Throwable = null)
+  extends Exception(message, cause)
+
+/**
+ * An exception thrown if a field is missing from a [[SubmitRestProtocolMessage]].
+ */
+private[spark] class SubmitRestMissingFieldException(message: String)
+  extends SubmitRestProtocolException(message)
+
+/**
+ * An exception thrown if the REST client cannot reach the REST server.
+ */
+private[spark] class SubmitRestConnectionException(message: String, cause: Throwable)
+  extends SubmitRestProtocolException(message, cause)

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala
new file mode 100644
index 0000000..b877898
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import scala.util.Try
+
+import com.fasterxml.jackson.annotation._
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.util.Utils
+
+/**
+ * An abstract message exchanged in the REST application submission protocol.
+ *
+ * This message is intended to be serialized to and deserialized from JSON in the exchange.
+ * Each message can either be a request or a response and consists of three common fields:
+ *   (1) the action, which fully specifies the type of the message
+ *   (2) the Spark version of the client / server
+ *   (3) an optional message
+ */
+@JsonInclude(Include.NON_NULL)
+@JsonAutoDetect(getterVisibility = Visibility.ANY, setterVisibility = Visibility.ANY)
+@JsonPropertyOrder(alphabetic = true)
+private[spark] abstract class SubmitRestProtocolMessage {
+  @JsonIgnore
+  val messageType = Utils.getFormattedClassName(this)
+
+  val action: String = messageType
+  var message: String = null
+
+  // For JSON deserialization
+  private def setAction(a: String): Unit = { }
+
+  /**
+   * Serialize the message to JSON.
+   * This also ensures that the message is valid and its fields are in the expected format.
+   */
+  def toJson: String = {
+    validate()
+    SubmitRestProtocolMessage.mapper.writeValueAsString(this)
+  }
+
+  /**
+   * Assert the validity of the message.
+   * If the validation fails, throw a [[SubmitRestProtocolException]].
+   */
+  final def validate(): Unit = {
+    try {
+      doValidate()
+    } catch {
+      case e: Exception =>
+        throw new SubmitRestProtocolException(s"Validation of message $messageType failed!", e)
+    }
+  }
+
+  /** Assert the validity of the message */
+  protected def doValidate(): Unit = {
+    if (action == null) {
+      throw new SubmitRestMissingFieldException(s"The action field is missing in $messageType")
+    }
+  }
+
+  /** Assert that the specified field is set in this message. */
+  protected def assertFieldIsSet[T](value: T, name: String): Unit = {
+    if (value == null) {
+      throw new SubmitRestMissingFieldException(s"'$name' is missing in message $messageType.")
+    }
+  }
+
+  /**
+   * Assert a condition when validating this message.
+   * If the assertion fails, throw a [[SubmitRestProtocolException]].
+   */
+  protected def assert(condition: Boolean, failMessage: String): Unit = {
+    if (!condition) { throw new SubmitRestProtocolException(failMessage) }
+  }
+}
+
+/**
+ * Helper methods to process serialized [[SubmitRestProtocolMessage]]s.
+ */
+private[spark] object SubmitRestProtocolMessage {
+  private val packagePrefix = this.getClass.getPackage.getName
+  private val mapper = new ObjectMapper()
+    .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    .enable(SerializationFeature.INDENT_OUTPUT)
+    .registerModule(DefaultScalaModule)
+
+  /**
+   * Parse the value of the action field from the given JSON.
+   * If the action field is not found, throw a [[SubmitRestMissingFieldException]].
+   */
+  def parseAction(json: String): String = {
+    parse(json).asInstanceOf[JObject].obj
+      .find { case (f, _) => f == "action" }
+      .map { case (_, v) => v.asInstanceOf[JString].s }
+      .getOrElse {
+        throw new SubmitRestMissingFieldException(s"Action field not found in JSON:\n$json")
+      }
+  }
+
+  /**
+   * Construct a [[SubmitRestProtocolMessage]] from its JSON representation.
+   *
+   * This method first parses the action from the JSON and uses it to infer the message type.
+   * Note that the action must represent one of the [[SubmitRestProtocolMessage]]s defined in
+   * this package. Otherwise, a [[ClassNotFoundException]] will be thrown.
+   */
+  def fromJson(json: String): SubmitRestProtocolMessage = {
+    val className = parseAction(json)
+    val clazz = Class.forName(packagePrefix + "." + className)
+      .asSubclass[SubmitRestProtocolMessage](classOf[SubmitRestProtocolMessage])
+    fromJson(json, clazz)
+  }
+
+  /**
+   * Construct a [[SubmitRestProtocolMessage]] from its JSON representation.
+   *
+   * This method determines the type of the message from the class provided instead of
+   * inferring it from the action field. This is useful for deserializing JSON that
+   * represents custom user-defined messages.
+   */
+  def fromJson[T <: SubmitRestProtocolMessage](json: String, clazz: Class[T]): T = {
+    mapper.readValue(json, clazz)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1390e56f/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
new file mode 100644
index 0000000..9e1fd8c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import scala.util.Try
+
+import org.apache.spark.util.Utils
+
+/**
+ * An abstract request sent from the client in the REST application submission protocol.
+ */
+private[spark] abstract class SubmitRestProtocolRequest extends SubmitRestProtocolMessage {
+  var clientSparkVersion: String = null
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assertFieldIsSet(clientSparkVersion, "clientSparkVersion")
+  }
+}
+
+/**
+ * A request to launch a new application in the REST application submission protocol.
+ */
+private[spark] class CreateSubmissionRequest extends SubmitRestProtocolRequest {
+  var appResource: String = null
+  var mainClass: String = null
+  var appArgs: Array[String] = null
+  var sparkProperties: Map[String, String] = null
+  var environmentVariables: Map[String, String] = null
+
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assert(sparkProperties != null, "No Spark properties set!")
+    assertFieldIsSet(appResource, "appResource")
+    assertPropertyIsSet("spark.app.name")
+    assertPropertyIsBoolean("spark.driver.supervise")
+    assertPropertyIsNumeric("spark.driver.cores")
+    assertPropertyIsNumeric("spark.cores.max")
+    assertPropertyIsMemory("spark.driver.memory")
+    assertPropertyIsMemory("spark.executor.memory")
+  }
+
+  private def assertPropertyIsSet(key: String): Unit =
+    assertFieldIsSet(sparkProperties.getOrElse(key, null), key)
+
+  private def assertPropertyIsBoolean(key: String): Unit =
+    assertProperty[Boolean](key, "boolean", _.toBoolean)
+
+  private def assertPropertyIsNumeric(key: String): Unit =
+    assertProperty[Int](key, "numeric", _.toInt)
+
+  private def assertPropertyIsMemory(key: String): Unit =
+    assertProperty[Int](key, "memory", Utils.memoryStringToMb)
+
+  /** Assert that a Spark property can be converted to a certain type. */
+  private def assertProperty[T](key: String, valueType: String, convert: (String => T)): Unit = {
+    sparkProperties.get(key).foreach { value =>
+      Try(convert(value)).getOrElse {
+        throw new SubmitRestProtocolException(
+          s"Property '$key' expected $valueType value: actual was '$value'.")
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org