You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/28 22:34:08 UTC

[1/3] spark git commit: [SPARK-5338] [MESOS] Add cluster mode support for Mesos

Repository: spark
Updated Branches:
  refs/heads/master 80098109d -> 53befacce


http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
new file mode 100644
index 0000000..f28e29e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.mesos
+
+import java.util.Date
+
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos._
+import org.apache.spark.{LocalSparkContext, SparkConf}
+
+
+class MesosClusterSchedulerSuite extends FunSuite with LocalSparkContext with MockitoSugar {
+
+  private val command = new Command("mainClass", Seq("arg"), null, null, null, null)
+
+  test("can queue drivers") {
+    val conf = new SparkConf()
+    conf.setMaster("mesos://localhost:5050")
+    conf.setAppName("spark mesos")
+    val scheduler = new MesosClusterScheduler(
+      new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+      override def start(): Unit = { ready = true }
+    }
+    scheduler.start()
+    val response = scheduler.submitDriver(
+        new MesosDriverDescription("d1", "jar", 1000, 1, true,
+          command, Map[String, String](), "s1", new Date()))
+    assert(response.success)
+    val response2 =
+      scheduler.submitDriver(new MesosDriverDescription(
+        "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
+    assert(response2.success)
+    val state = scheduler.getSchedulerState()
+    val queuedDrivers = state.queuedDrivers.toList
+    assert(queuedDrivers(0).submissionId == response.submissionId)
+    assert(queuedDrivers(1).submissionId == response2.submissionId)
+  }
+
+  test("can kill queued drivers") {
+    val conf = new SparkConf()
+    conf.setMaster("mesos://localhost:5050")
+    conf.setAppName("spark mesos")
+    val scheduler = new MesosClusterScheduler(
+      new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+      override def start(): Unit = { ready = true }
+    }
+    scheduler.start()
+    val response = scheduler.submitDriver(
+        new MesosDriverDescription("d1", "jar", 1000, 1, true,
+          command, Map[String, String](), "s1", new Date()))
+    assert(response.success)
+    val killResponse = scheduler.killDriver(response.submissionId)
+    assert(killResponse.success)
+    val state = scheduler.getSchedulerState()
+    assert(state.queuedDrivers.isEmpty)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 594bf78..8f53d82 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -78,6 +78,9 @@ To verify that the Mesos cluster is ready for Spark, navigate to the Mesos maste
 To use Mesos from Spark, you need a Spark binary package available in a place accessible by Mesos, and
 a Spark driver program configured to connect to Mesos.
 
+Alternatively, you can also install Spark in the same location in all the Mesos slaves, and configure
+`spark.mesos.executor.home` (defaults to SPARK_HOME) to point to that location.
+
 ## Uploading Spark Package
 
 When Mesos runs a task on a Mesos slave for the first time, that slave must have a Spark binary
@@ -107,7 +110,11 @@ the `make-distribution.sh` script included in a Spark source tarball/checkout.
 The Master URLs for Mesos are in the form `mesos://host:5050` for a single-master Mesos
 cluster, or `mesos://zk://host:2181` for a multi-master Mesos cluster using ZooKeeper.
 
-The driver also needs some configuration in `spark-env.sh` to interact properly with Mesos:
+## Client Mode
+
+In client mode, a Spark Mesos framework is launched directly on the client machine and waits for the driver output.
+
+The driver needs some configuration in `spark-env.sh` to interact properly with Mesos:
 
 1. In `spark-env.sh` set some environment variables:
  * `export MESOS_NATIVE_JAVA_LIBRARY=<path to libmesos.so>`. This path is typically
@@ -129,8 +136,7 @@ val sc = new SparkContext(conf)
 {% endhighlight %}
 
 (You can also use [`spark-submit`](submitting-applications.html) and configure `spark.executor.uri`
-in the [conf/spark-defaults.conf](configuration.html#loading-default-configurations) file. Note
-that `spark-submit` currently only supports deploying the Spark driver in `client` mode for Mesos.)
+in the [conf/spark-defaults.conf](configuration.html#loading-default-configurations) file.)
 
 When running a shell, the `spark.executor.uri` parameter is inherited from `SPARK_EXECUTOR_URI`, so
 it does not need to be redundantly passed in as a system property.
@@ -139,6 +145,17 @@ it does not need to be redundantly passed in as a system property.
 ./bin/spark-shell --master mesos://host:5050
 {% endhighlight %}
 
+## Cluster mode
+
+Spark on Mesos also supports cluster mode, where the driver is launched in the cluster and the client
+can find the results of the driver from the Mesos Web UI.
+
+To use cluster mode, you must start the MesosClusterDispatcher in your cluster via the `sbin/start-mesos-dispatcher.sh` script,
+passing in the Mesos master url (e.g: mesos://host:5050).
+
+From the client, you can submit a job to Mesos cluster by running `spark-submit` and specifying the master url
+to the url of the MesosClusterDispatcher (e.g: mesos://dispatcher:7077). You can view driver statuses on the
+Spark cluster Web UI.
 
 # Mesos Run Modes
 

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/sbin/start-mesos-dispatcher.sh
----------------------------------------------------------------------
diff --git a/sbin/start-mesos-dispatcher.sh b/sbin/start-mesos-dispatcher.sh
new file mode 100755
index 0000000..ef1fc57
--- /dev/null
+++ b/sbin/start-mesos-dispatcher.sh
@@ -0,0 +1,40 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Starts the Mesos Cluster Dispatcher on the machine this script is executed on.
+# The Mesos Cluster Dispatcher is responsible for launching the Mesos framework and
+# Rest server to handle driver requests for Mesos cluster mode.
+# Only one cluster dispatcher is needed per Mesos cluster.
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+. "$sbin/spark-config.sh"
+
+. "$SPARK_PREFIX/bin/load-spark-env.sh"
+
+if [ "$SPARK_MESOS_DISPATCHER_PORT" = "" ]; then
+  SPARK_MESOS_DISPATCHER_PORT=7077
+fi
+
+if [ "$SPARK_MESOS_DISPATCHER_HOST" = "" ]; then
+  SPARK_MESOS_DISPATCHER_HOST=`hostname`
+fi
+
+
+"$sbin"/spark-daemon.sh start org.apache.spark.deploy.mesos.MesosClusterDispatcher 1 --host $SPARK_MESOS_DISPATCHER_HOST --port $SPARK_MESOS_DISPATCHER_PORT "$@"

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/sbin/stop-mesos-dispatcher.sh
----------------------------------------------------------------------
diff --git a/sbin/stop-mesos-dispatcher.sh b/sbin/stop-mesos-dispatcher.sh
new file mode 100755
index 0000000..cb65d95
--- /dev/null
+++ b/sbin/stop-mesos-dispatcher.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Stop the Mesos Cluster dispatcher on the machine this script is executed on.
+
+sbin=`dirname "$0"`
+sbin=`cd "$sbin"; pwd`
+
+. "$sbin/spark-config.sh"
+
+"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.mesos.MesosClusterDispatcher 1
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/3] spark git commit: [SPARK-5338] [MESOS] Add cluster mode support for Mesos

Posted by an...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 2d6b8d4..502b9bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -18,26 +18,16 @@
 package org.apache.spark.deploy.rest
 
 import java.io.File
-import java.net.InetSocketAddress
-import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-
-import scala.io.Source
+import javax.servlet.http.HttpServletResponse
 
 import akka.actor.ActorRef
-import com.fasterxml.jackson.core.JsonProcessingException
-import org.eclipse.jetty.server.Server
-import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
-import org.eclipse.jetty.util.thread.QueuedThreadPool
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
-import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
-import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
 import org.apache.spark.deploy.ClientArguments._
+import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
+import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
+import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
 
 /**
- * A server that responds to requests submitted by the [[StandaloneRestClient]].
+ * A server that responds to requests submitted by the [[RestSubmissionClient]].
  * This is intended to be embedded in the standalone Master and used in cluster mode only.
  *
  * This server responds with different HTTP codes depending on the situation:
@@ -54,173 +44,31 @@ import org.apache.spark.deploy.ClientArguments._
  *
  * @param host the address this server should bind to
  * @param requestedPort the port this server will attempt to bind to
+ * @param masterConf the conf used by the Master
  * @param masterActor reference to the Master actor to which requests can be sent
  * @param masterUrl the URL of the Master new drivers will attempt to connect to
- * @param masterConf the conf used by the Master
  */
 private[deploy] class StandaloneRestServer(
     host: String,
     requestedPort: Int,
+    masterConf: SparkConf,
     masterActor: ActorRef,
-    masterUrl: String,
-    masterConf: SparkConf)
-  extends Logging {
-
-  import StandaloneRestServer._
-
-  private var _server: Option[Server] = None
-
-  // A mapping from URL prefixes to servlets that serve them. Exposed for testing.
-  protected val baseContext = s"/$PROTOCOL_VERSION/submissions"
-  protected val contextToServlet = Map[String, StandaloneRestServlet](
-    s"$baseContext/create/*" -> new SubmitRequestServlet(masterActor, masterUrl, masterConf),
-    s"$baseContext/kill/*" -> new KillRequestServlet(masterActor, masterConf),
-    s"$baseContext/status/*" -> new StatusRequestServlet(masterActor, masterConf),
-    "/*" -> new ErrorServlet // default handler
-  )
-
-  /** Start the server and return the bound port. */
-  def start(): Int = {
-    val (server, boundPort) = Utils.startServiceOnPort[Server](requestedPort, doStart, masterConf)
-    _server = Some(server)
-    logInfo(s"Started REST server for submitting applications on port $boundPort")
-    boundPort
-  }
-
-  /**
-   * Map the servlets to their corresponding contexts and attach them to a server.
-   * Return a 2-tuple of the started server and the bound port.
-   */
-  private def doStart(startPort: Int): (Server, Int) = {
-    val server = new Server(new InetSocketAddress(host, startPort))
-    val threadPool = new QueuedThreadPool
-    threadPool.setDaemon(true)
-    server.setThreadPool(threadPool)
-    val mainHandler = new ServletContextHandler
-    mainHandler.setContextPath("/")
-    contextToServlet.foreach { case (prefix, servlet) =>
-      mainHandler.addServlet(new ServletHolder(servlet), prefix)
-    }
-    server.setHandler(mainHandler)
-    server.start()
-    val boundPort = server.getConnectors()(0).getLocalPort
-    (server, boundPort)
-  }
-
-  def stop(): Unit = {
-    _server.foreach(_.stop())
-  }
-}
-
-private[rest] object StandaloneRestServer {
-  val PROTOCOL_VERSION = StandaloneRestClient.PROTOCOL_VERSION
-  val SC_UNKNOWN_PROTOCOL_VERSION = 468
-}
-
-/**
- * An abstract servlet for handling requests passed to the [[StandaloneRestServer]].
- */
-private[rest] abstract class StandaloneRestServlet extends HttpServlet with Logging {
-
-  /**
-   * Serialize the given response message to JSON and send it through the response servlet.
-   * This validates the response before sending it to ensure it is properly constructed.
-   */
-  protected def sendResponse(
-      responseMessage: SubmitRestProtocolResponse,
-      responseServlet: HttpServletResponse): Unit = {
-    val message = validateResponse(responseMessage, responseServlet)
-    responseServlet.setContentType("application/json")
-    responseServlet.setCharacterEncoding("utf-8")
-    responseServlet.getWriter.write(message.toJson)
-  }
-
-  /**
-   * Return any fields in the client request message that the server does not know about.
-   *
-   * The mechanism for this is to reconstruct the JSON on the server side and compare the
-   * diff between this JSON and the one generated on the client side. Any fields that are
-   * only in the client JSON are treated as unexpected.
-   */
-  protected def findUnknownFields(
-      requestJson: String,
-      requestMessage: SubmitRestProtocolMessage): Array[String] = {
-    val clientSideJson = parse(requestJson)
-    val serverSideJson = parse(requestMessage.toJson)
-    val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
-    unknown match {
-      case j: JObject => j.obj.map { case (k, _) => k }.toArray
-      case _ => Array.empty[String] // No difference
-    }
-  }
-
-  /** Return a human readable String representation of the exception. */
-  protected def formatException(e: Throwable): String = {
-    val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
-    s"$e\n$stackTraceString"
-  }
-
-  /** Construct an error message to signal the fact that an exception has been thrown. */
-  protected def handleError(message: String): ErrorResponse = {
-    val e = new ErrorResponse
-    e.serverSparkVersion = sparkVersion
-    e.message = message
-    e
-  }
-
-  /**
-   * Parse a submission ID from the relative path, assuming it is the first part of the path.
-   * For instance, we expect the path to take the form /[submission ID]/maybe/something/else.
-   * The returned submission ID cannot be empty. If the path is unexpected, return None.
-   */
-  protected def parseSubmissionId(path: String): Option[String] = {
-    if (path == null || path.isEmpty) {
-      None
-    } else {
-      path.stripPrefix("/").split("/").headOption.filter(_.nonEmpty)
-    }
-  }
-
-  /**
-   * Validate the response to ensure that it is correctly constructed.
-   *
-   * If it is, simply return the message as is. Otherwise, return an error response instead
-   * to propagate the exception back to the client and set the appropriate error code.
-   */
-  private def validateResponse(
-      responseMessage: SubmitRestProtocolResponse,
-      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
-    try {
-      responseMessage.validate()
-      responseMessage
-    } catch {
-      case e: Exception =>
-        responseServlet.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
-        handleError("Internal server error: " + formatException(e))
-    }
-  }
+    masterUrl: String)
+  extends RestSubmissionServer(host, requestedPort, masterConf) {
+
+  protected override val submitRequestServlet =
+    new StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf)
+  protected override val killRequestServlet =
+    new StandaloneKillRequestServlet(masterActor, masterConf)
+  protected override val statusRequestServlet =
+    new StandaloneStatusRequestServlet(masterActor, masterConf)
 }
 
 /**
  * A servlet for handling kill requests passed to the [[StandaloneRestServer]].
  */
-private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
-  extends StandaloneRestServlet {
-
-  /**
-   * If a submission ID is specified in the URL, have the Master kill the corresponding
-   * driver and return an appropriate response to the client. Otherwise, return error.
-   */
-  protected override def doPost(
-      request: HttpServletRequest,
-      response: HttpServletResponse): Unit = {
-    val submissionId = parseSubmissionId(request.getPathInfo)
-    val responseMessage = submissionId.map(handleKill).getOrElse {
-      response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
-      handleError("Submission ID is missing in kill request.")
-    }
-    sendResponse(responseMessage, response)
-  }
+private[rest] class StandaloneKillRequestServlet(masterActor: ActorRef, conf: SparkConf)
+  extends KillRequestServlet {
 
   protected def handleKill(submissionId: String): KillSubmissionResponse = {
     val askTimeout = RpcUtils.askTimeout(conf)
@@ -238,23 +86,8 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
 /**
  * A servlet for handling status requests passed to the [[StandaloneRestServer]].
  */
-private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
-  extends StandaloneRestServlet {
-
-  /**
-   * If a submission ID is specified in the URL, request the status of the corresponding
-   * driver from the Master and include it in the response. Otherwise, return error.
-   */
-  protected override def doGet(
-      request: HttpServletRequest,
-      response: HttpServletResponse): Unit = {
-    val submissionId = parseSubmissionId(request.getPathInfo)
-    val responseMessage = submissionId.map(handleStatus).getOrElse {
-      response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
-      handleError("Submission ID is missing in status request.")
-    }
-    sendResponse(responseMessage, response)
-  }
+private[rest] class StandaloneStatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
+  extends StatusRequestServlet {
 
   protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
     val askTimeout = RpcUtils.askTimeout(conf)
@@ -276,71 +109,11 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
 /**
  * A servlet for handling submit requests passed to the [[StandaloneRestServer]].
  */
-private[rest] class SubmitRequestServlet(
+private[rest] class StandaloneSubmitRequestServlet(
     masterActor: ActorRef,
     masterUrl: String,
     conf: SparkConf)
-  extends StandaloneRestServlet {
-
-  /**
-   * Submit an application to the Master with parameters specified in the request.
-   *
-   * The request is assumed to be a [[SubmitRestProtocolRequest]] in the form of JSON.
-   * If the request is successfully processed, return an appropriate response to the
-   * client indicating so. Otherwise, return error instead.
-   */
-  protected override def doPost(
-      requestServlet: HttpServletRequest,
-      responseServlet: HttpServletResponse): Unit = {
-    val responseMessage =
-      try {
-        val requestMessageJson = Source.fromInputStream(requestServlet.getInputStream).mkString
-        val requestMessage = SubmitRestProtocolMessage.fromJson(requestMessageJson)
-        // The response should have already been validated on the client.
-        // In case this is not true, validate it ourselves to avoid potential NPEs.
-        requestMessage.validate()
-        handleSubmit(requestMessageJson, requestMessage, responseServlet)
-      } catch {
-        // The client failed to provide a valid JSON, so this is not our fault
-        case e @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
-          responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
-          handleError("Malformed request: " + formatException(e))
-      }
-    sendResponse(responseMessage, responseServlet)
-  }
-
-  /**
-   * Handle the submit request and construct an appropriate response to return to the client.
-   *
-   * This assumes that the request message is already successfully validated.
-   * If the request message is not of the expected type, return error to the client.
-   */
-  private def handleSubmit(
-      requestMessageJson: String,
-      requestMessage: SubmitRestProtocolMessage,
-      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
-    requestMessage match {
-      case submitRequest: CreateSubmissionRequest =>
-        val askTimeout = RpcUtils.askTimeout(conf)
-        val driverDescription = buildDriverDescription(submitRequest)
-        val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
-          DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
-        val submitResponse = new CreateSubmissionResponse
-        submitResponse.serverSparkVersion = sparkVersion
-        submitResponse.message = response.message
-        submitResponse.success = response.success
-        submitResponse.submissionId = response.driverId.orNull
-        val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
-        if (unknownFields.nonEmpty) {
-          // If there are fields that the server does not know about, warn the client
-          submitResponse.unknownFields = unknownFields
-        }
-        submitResponse
-      case unexpected =>
-        responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
-        handleError(s"Received message of unexpected type ${unexpected.messageType}.")
-    }
-  }
+  extends SubmitRequestServlet {
 
   /**
    * Build a driver description from the fields specified in the submit request.
@@ -389,50 +162,37 @@ private[rest] class SubmitRequestServlet(
     new DriverDescription(
       appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)
   }
-}
 
-/**
- * A default servlet that handles error cases that are not captured by other servlets.
- */
-private class ErrorServlet extends StandaloneRestServlet {
-  private val serverVersion = StandaloneRestServer.PROTOCOL_VERSION
-
-  /** Service a faulty request by returning an appropriate error message to the client. */
-  protected override def service(
-      request: HttpServletRequest,
-      response: HttpServletResponse): Unit = {
-    val path = request.getPathInfo
-    val parts = path.stripPrefix("/").split("/").filter(_.nonEmpty).toList
-    var versionMismatch = false
-    var msg =
-      parts match {
-        case Nil =>
-          // http://host:port/
-          "Missing protocol version."
-        case `serverVersion` :: Nil =>
-          // http://host:port/correct-version
-          "Missing the /submissions prefix."
-        case `serverVersion` :: "submissions" :: tail =>
-          // http://host:port/correct-version/submissions/*
-          "Missing an action: please specify one of /create, /kill, or /status."
-        case unknownVersion :: tail =>
-          // http://host:port/unknown-version/*
-          versionMismatch = true
-          s"Unknown protocol version '$unknownVersion'."
-        case _ =>
-          // never reached
-          s"Malformed path $path."
-      }
-    msg += s" Please submit requests through http://[host]:[port]/$serverVersion/submissions/..."
-    val error = handleError(msg)
-    // If there is a version mismatch, include the highest protocol version that
-    // this server supports in case the client wants to retry with our version
-    if (versionMismatch) {
-      error.highestProtocolVersion = serverVersion
-      response.setStatus(StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
-    } else {
-      response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+  /**
+   * Handle the submit request and construct an appropriate response to return to the client.
+   *
+   * This assumes that the request message is already successfully validated.
+   * If the request message is not of the expected type, return error to the client.
+   */
+  protected override def handleSubmit(
+      requestMessageJson: String,
+      requestMessage: SubmitRestProtocolMessage,
+      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+    requestMessage match {
+      case submitRequest: CreateSubmissionRequest =>
+        val askTimeout = RpcUtils.askTimeout(conf)
+        val driverDescription = buildDriverDescription(submitRequest)
+        val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
+          DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
+        val submitResponse = new CreateSubmissionResponse
+        submitResponse.serverSparkVersion = sparkVersion
+        submitResponse.message = response.message
+        submitResponse.success = response.success
+        submitResponse.submissionId = response.driverId.orNull
+        val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
+        if (unknownFields.nonEmpty) {
+          // If there are fields that the server does not know about, warn the client
+          submitResponse.unknownFields = unknownFields
+        }
+        submitResponse
+      case unexpected =>
+        responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+        handleError(s"Received message of unexpected type ${unexpected.messageType}.")
     }
-    sendResponse(error, response)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
index d80abdf..0d50a76 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
@@ -61,7 +61,7 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest {
     assertProperty[Boolean](key, "boolean", _.toBoolean)
 
   private def assertPropertyIsNumeric(key: String): Unit =
-    assertProperty[Int](key, "numeric", _.toInt)
+    assertProperty[Double](key, "numeric", _.toDouble)
 
   private def assertPropertyIsMemory(key: String): Unit =
     assertProperty[Int](key, "memory", Utils.memoryStringToMb)

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
index 8fde8c1..0e226ee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
@@ -35,7 +35,7 @@ private[rest] abstract class SubmitRestProtocolResponse extends SubmitRestProtoc
 /**
  * A response to a [[CreateSubmissionRequest]] in the REST application submission protocol.
  */
-private[rest] class CreateSubmissionResponse extends SubmitRestProtocolResponse {
+private[spark] class CreateSubmissionResponse extends SubmitRestProtocolResponse {
   var submissionId: String = null
   protected override def doValidate(): Unit = {
     super.doValidate()
@@ -46,7 +46,7 @@ private[rest] class CreateSubmissionResponse extends SubmitRestProtocolResponse
 /**
  * A response to a kill request in the REST application submission protocol.
  */
-private[rest] class KillSubmissionResponse extends SubmitRestProtocolResponse {
+private[spark] class KillSubmissionResponse extends SubmitRestProtocolResponse {
   var submissionId: String = null
   protected override def doValidate(): Unit = {
     super.doValidate()
@@ -58,7 +58,7 @@ private[rest] class KillSubmissionResponse extends SubmitRestProtocolResponse {
 /**
  * A response to a status request in the REST application submission protocol.
  */
-private[rest] class SubmissionStatusResponse extends SubmitRestProtocolResponse {
+private[spark] class SubmissionStatusResponse extends SubmitRestProtocolResponse {
   var submissionId: String = null
   var driverState: String = null
   var workerId: String = null

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
new file mode 100644
index 0000000..fd17a98
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest.mesos
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.Date
+import java.util.concurrent.atomic.AtomicLong
+import javax.servlet.http.HttpServletResponse
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.deploy.rest._
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
+import org.apache.spark.util.Utils
+import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
+
+
+/**
+ * A server that responds to requests submitted by the [[RestSubmissionClient]].
+ * All requests are forwarded to
+ * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
+ * This is intended to be used in Mesos cluster mode only.
+ * For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs.
+ */
+private[spark] class MesosRestServer(
+    host: String,
+    requestedPort: Int,
+    masterConf: SparkConf,
+    scheduler: MesosClusterScheduler)
+  extends RestSubmissionServer(host, requestedPort, masterConf) {
+
+  protected override val submitRequestServlet =
+    new MesosSubmitRequestServlet(scheduler, masterConf)
+  protected override val killRequestServlet =
+    new MesosKillRequestServlet(scheduler, masterConf)
+  protected override val statusRequestServlet =
+    new MesosStatusRequestServlet(scheduler, masterConf)
+}
+
+private[deploy] class MesosSubmitRequestServlet(
+    scheduler: MesosClusterScheduler,
+    conf: SparkConf)
+  extends SubmitRequestServlet {
+
+  private val DEFAULT_SUPERVISE = false
+  private val DEFAULT_MEMORY = 512 // mb
+  private val DEFAULT_CORES = 1.0
+
+  private val nextDriverNumber = new AtomicLong(0)
+  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
+  private def newDriverId(submitDate: Date): String = {
+    "driver-%s-%04d".format(
+      createDateFormat.format(submitDate), nextDriverNumber.incrementAndGet())
+  }
+
+  /**
+   * Build a driver description from the fields specified in the submit request.
+   *
+   * This involves constructing a command that launches a mesos framework for the job.
+   * This does not currently consider fields used by python applications since python
+   * is not supported in mesos cluster mode yet.
+   */
+  private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = {
+    // Required fields, including the main class because python is not yet supported
+    val appResource = Option(request.appResource).getOrElse {
+      throw new SubmitRestMissingFieldException("Application jar is missing.")
+    }
+    val mainClass = Option(request.mainClass).getOrElse {
+      throw new SubmitRestMissingFieldException("Main class is missing.")
+    }
+
+    // Optional fields
+    val sparkProperties = request.sparkProperties
+    val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
+    val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
+    val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
+    val superviseDriver = sparkProperties.get("spark.driver.supervise")
+    val driverMemory = sparkProperties.get("spark.driver.memory")
+    val driverCores = sparkProperties.get("spark.driver.cores")
+    val appArgs = request.appArgs
+    val environmentVariables = request.environmentVariables
+    val name = request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
+
+    // Construct driver description
+    val conf = new SparkConf(false).setAll(sparkProperties)
+    val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
+    val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
+    val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
+    val sparkJavaOpts = Utils.sparkJavaOpts(conf)
+    val javaOpts = sparkJavaOpts ++ extraJavaOpts
+    val command = new Command(
+      mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
+    val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
+    val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
+    val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
+    val submitDate = new Date()
+    val submissionId = newDriverId(submitDate)
+
+    new MesosDriverDescription(
+      name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver,
+      command, request.sparkProperties, submissionId, submitDate)
+  }
+
+  protected override def handleSubmit(
+      requestMessageJson: String,
+      requestMessage: SubmitRestProtocolMessage,
+      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+    requestMessage match {
+      case submitRequest: CreateSubmissionRequest =>
+        val driverDescription = buildDriverDescription(submitRequest)
+        val s = scheduler.submitDriver(driverDescription)
+        s.serverSparkVersion = sparkVersion
+        val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
+        if (unknownFields.nonEmpty) {
+          // If there are fields that the server does not know about, warn the client
+          s.unknownFields = unknownFields
+        }
+        s
+      case unexpected =>
+        responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+        handleError(s"Received message of unexpected type ${unexpected.messageType}.")
+    }
+  }
+}
+
+private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+  extends KillRequestServlet {
+  protected override def handleKill(submissionId: String): KillSubmissionResponse = {
+    val k = scheduler.killDriver(submissionId)
+    k.serverSparkVersion = sparkVersion
+    k
+  }
+}
+
+private[deploy] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+  extends StatusRequestServlet {
+  protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
+    val d = scheduler.getDriverStatus(submissionId)
+    d.serverSparkVersion = sparkVersion
+    d
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 82f652d..3412301 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,20 +18,17 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
-import java.util.{List => JList}
-import java.util.Collections
+import java.util.{Collections, List => JList}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
 
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException, TaskState}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.{Scheduler => MScheduler, _}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -49,17 +46,10 @@ private[spark] class CoarseMesosSchedulerBackend(
     master: String)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
   with MScheduler
-  with Logging {
+  with MesosSchedulerUtils {
 
   val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
 
-  // Lock used to wait for scheduler to be registered
-  var isRegistered = false
-  val registeredLock = new Object()
-
-  // Driver for talking to Mesos
-  var driver: SchedulerDriver = null
-
   // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
   val maxCores = conf.get("spark.cores.max",  Int.MaxValue.toString).toInt
 
@@ -87,26 +77,8 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   override def start() {
     super.start()
-
-    synchronized {
-      new Thread("CoarseMesosSchedulerBackend driver") {
-        setDaemon(true)
-        override def run() {
-          val scheduler = CoarseMesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
-          try { {
-            val ret = driver.run()
-            logInfo("driver.run() returned with code " + ret)
-          }
-          } catch {
-            case e: Exception => logError("driver.run() failed", e)
-          }
-        }
-      }.start()
-
-      waitForRegister()
-    }
+    val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
+    startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
   }
 
   def createCommand(offer: Offer, numCores: Int): CommandInfo = {
@@ -150,8 +122,10 @@ private[spark] class CoarseMesosSchedulerBackend(
       conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
 
-    val uri = conf.get("spark.executor.uri", null)
-    if (uri == null) {
+    val uri = conf.getOption("spark.executor.uri")
+      .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+
+    if (uri.isEmpty) {
       val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
       command.setValue(
         "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
@@ -164,7 +138,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
-      val basename = uri.split('/').last.split('.').head
+      val basename = uri.get.split('/').last.split('.').head
       command.setValue(
         s"cd $basename*; $prefixEnv " +
          "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
@@ -173,7 +147,7 @@ private[spark] class CoarseMesosSchedulerBackend(
         s" --hostname ${offer.getHostname}" +
         s" --cores $numCores" +
         s" --app-id $appId")
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
     }
     command.build()
   }
@@ -183,18 +157,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
     appId = frameworkId.getValue
     logInfo("Registered as framework ID " + appId)
-    registeredLock.synchronized {
-      isRegistered = true
-      registeredLock.notifyAll()
-    }
-  }
-
-  def waitForRegister() {
-    registeredLock.synchronized {
-      while (!isRegistered) {
-        registeredLock.wait()
-      }
-    }
+    markRegistered()
   }
 
   override def disconnected(d: SchedulerDriver) {}
@@ -245,14 +208,6 @@ private[spark] class CoarseMesosSchedulerBackend(
     }
   }
 
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
-  private def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
-    }
-    0
-  }
-
   /** Build a Mesos resource protobuf object */
   private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
     Resource.newBuilder()
@@ -284,7 +239,8 @@ private[spark] class CoarseMesosSchedulerBackend(
                 "is Spark installed on it?")
           }
         }
-        driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
+        // In case we'd rejected everything before but have now lost a node
+        mesosDriver.reviveOffers()
       }
     }
   }
@@ -296,8 +252,8 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   override def stop() {
     super.stop()
-    if (driver != null) {
-      driver.stop()
+    if (mesosDriver != null) {
+      mesosDriver.stop()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
new file mode 100644
index 0000000..3efc536
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConversions._
+
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException.NoNodeException
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkCuratorUtil
+import org.apache.spark.util.Utils
+
+/**
+ * Persistence engine factory that is responsible for creating new persistence engines
+ * to store Mesos cluster mode state.
+ */
+private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) {
+  def createEngine(path: String): MesosClusterPersistenceEngine
+}
+
+/**
+ * Mesos cluster persistence engine is responsible for persisting Mesos cluster mode
+ * specific state, so that on failover all the state can be recovered and the scheduler
+ * can resume managing the drivers.
+ */
+private[spark] trait MesosClusterPersistenceEngine {
+  def persist(name: String, obj: Object): Unit
+  def expunge(name: String): Unit
+  def fetch[T](name: String): Option[T]
+  def fetchAll[T](): Iterable[T]
+}
+
+/**
+ * Zookeeper backed persistence engine factory.
+ * All Zk engines created from this factory shares the same Zookeeper client, so
+ * all of them reuses the same connection pool.
+ */
+private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf)
+  extends MesosClusterPersistenceEngineFactory(conf) {
+
+  lazy val zk = SparkCuratorUtil.newClient(conf, "spark.mesos.deploy.zookeeper.url")
+
+  def createEngine(path: String): MesosClusterPersistenceEngine = {
+    new ZookeeperMesosClusterPersistenceEngine(path, zk, conf)
+  }
+}
+
+/**
+ * Black hole persistence engine factory that creates black hole
+ * persistence engines, which stores nothing.
+ */
+private[spark] class BlackHoleMesosClusterPersistenceEngineFactory
+  extends MesosClusterPersistenceEngineFactory(null) {
+  def createEngine(path: String): MesosClusterPersistenceEngine = {
+    new BlackHoleMesosClusterPersistenceEngine
+  }
+}
+
+/**
+ * Black hole persistence engine that stores nothing.
+ */
+private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine {
+  override def persist(name: String, obj: Object): Unit = {}
+  override def fetch[T](name: String): Option[T] = None
+  override def expunge(name: String): Unit = {}
+  override def fetchAll[T](): Iterable[T] = Iterable.empty[T]
+}
+
+/**
+ * Zookeeper based Mesos cluster persistence engine, that stores cluster mode state
+ * into Zookeeper. Each engine object is operating under one folder in Zookeeper, but
+ * reuses a shared Zookeeper client.
+ */
+private[spark] class ZookeeperMesosClusterPersistenceEngine(
+    baseDir: String,
+    zk: CuratorFramework,
+    conf: SparkConf)
+  extends MesosClusterPersistenceEngine with Logging {
+  private val WORKING_DIR =
+    conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir
+
+  SparkCuratorUtil.mkdir(zk, WORKING_DIR)
+
+  def path(name: String): String = {
+    WORKING_DIR + "/" + name
+  }
+
+  override def expunge(name: String): Unit = {
+    zk.delete().forPath(path(name))
+  }
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = Utils.serialize(obj)
+    val zkPath = path(name)
+    zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized)
+  }
+
+  override def fetch[T](name: String): Option[T] = {
+    val zkPath = path(name)
+
+    try {
+      val fileData = zk.getData().forPath(zkPath)
+      Some(Utils.deserialize[T](fileData))
+    } catch {
+      case e: NoNodeException => None
+      case e: Exception => {
+        logWarning("Exception while reading persisted file, deleting", e)
+        zk.delete().forPath(zkPath)
+        None
+      }
+    }
+  }
+
+  override def fetchAll[T](): Iterable[T] = {
+    zk.getChildren.forPath(WORKING_DIR).map(fetch[T]).flatten
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
new file mode 100644
index 0000000..0396e62
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.concurrent.locks.ReentrantLock
+import java.util.{Collections, Date, List => JList}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.mesos.Protos.Environment.Variable
+import org.apache.mesos.Protos.TaskStatus.Reason
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.{Scheduler, SchedulerDriver}
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.util.Utils
+import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
+
+
+/**
+ * Tracks the current state of a Mesos Task that runs a Spark driver.
+ * @param driverDescription Submitted driver description from
+ * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]]
+ * @param taskId Mesos TaskID generated for the task
+ * @param slaveId Slave ID that the task is assigned to
+ * @param mesosTaskStatus The last known task status update.
+ * @param startDate The date the task was launched
+ */
+private[spark] class MesosClusterSubmissionState(
+    val driverDescription: MesosDriverDescription,
+    val taskId: TaskID,
+    val slaveId: SlaveID,
+    var mesosTaskStatus: Option[TaskStatus],
+    var startDate: Date)
+  extends Serializable {
+
+  def copy(): MesosClusterSubmissionState = {
+    new MesosClusterSubmissionState(
+      driverDescription, taskId, slaveId, mesosTaskStatus, startDate)
+  }
+}
+
+/**
+ * Tracks the retry state of a driver, which includes the next time it should be scheduled
+ * and necessary information to do exponential backoff.
+ * This class is not thread-safe, and we expect the caller to handle synchronizing state.
+ * @param lastFailureStatus Last Task status when it failed.
+ * @param retries Number of times it has been retried.
+ * @param nextRetry Time at which it should be retried next
+ * @param waitTime The amount of time driver is scheduled to wait until next retry.
+ */
+private[spark] class MesosClusterRetryState(
+    val lastFailureStatus: TaskStatus,
+    val retries: Int,
+    val nextRetry: Date,
+    val waitTime: Int) extends Serializable {
+  def copy(): MesosClusterRetryState =
+    new MesosClusterRetryState(lastFailureStatus, retries, nextRetry, waitTime)
+}
+
+/**
+ * The full state of the cluster scheduler, currently being used for displaying
+ * information on the UI.
+ * @param frameworkId Mesos Framework id for the cluster scheduler.
+ * @param masterUrl The Mesos master url
+ * @param queuedDrivers All drivers queued to be launched
+ * @param launchedDrivers All launched or running drivers
+ * @param finishedDrivers All terminated drivers
+ * @param pendingRetryDrivers All drivers pending to be retried
+ */
+private[spark] class MesosClusterSchedulerState(
+    val frameworkId: String,
+    val masterUrl: Option[String],
+    val queuedDrivers: Iterable[MesosDriverDescription],
+    val launchedDrivers: Iterable[MesosClusterSubmissionState],
+    val finishedDrivers: Iterable[MesosClusterSubmissionState],
+    val pendingRetryDrivers: Iterable[MesosDriverDescription])
+
+/**
+ * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
+ * as Mesos tasks in a Mesos cluster.
+ * All drivers are launched asynchronously by the framework, which will eventually be launched
+ * by one of the slaves in the cluster. The results of the driver will be stored in slave's task
+ * sandbox which is accessible by visiting the Mesos UI.
+ * This scheduler supports recovery by persisting all its state and performs task reconciliation
+ * on recover, which gets all the latest state for all the drivers from Mesos master.
+ */
+private[spark] class MesosClusterScheduler(
+    engineFactory: MesosClusterPersistenceEngineFactory,
+    conf: SparkConf)
+  extends Scheduler with MesosSchedulerUtils {
+  var frameworkUrl: String = _
+  private val metricsSystem =
+    MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
+  private val master = conf.get("spark.master")
+  private val appName = conf.get("spark.app.name")
+  private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
+  private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
+  private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
+  private val schedulerState = engineFactory.createEngine("scheduler")
+  private val stateLock = new ReentrantLock()
+  private val finishedDrivers =
+    new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
+  private var frameworkId: String = null
+  // Holds all the launched drivers and current launch state, keyed by driver id.
+  private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]()
+  // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation.
+  // All drivers that are loaded after failover are added here, as we need get the latest
+  // state of the tasks from Mesos.
+  private val pendingRecover = new mutable.HashMap[String, SlaveID]()
+  // Stores all the submitted drivers that hasn't been launched.
+  private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
+  // All supervised drivers that are waiting to retry after termination.
+  private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
+  private val queuedDriversState = engineFactory.createEngine("driverQueue")
+  private val launchedDriversState = engineFactory.createEngine("launchedDrivers")
+  private val pendingRetryDriversState = engineFactory.createEngine("retryList")
+  // Flag to mark if the scheduler is ready to be called, which is until the scheduler
+  // is registered with Mesos master.
+  @volatile protected var ready = false
+  private var masterInfo: Option[MasterInfo] = None
+
+  def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
+    val c = new CreateSubmissionResponse
+    if (!ready) {
+      c.success = false
+      c.message = "Scheduler is not ready to take requests"
+      return c
+    }
+
+    stateLock.synchronized {
+      if (isQueueFull()) {
+        c.success = false
+        c.message = "Already reached maximum submission size"
+        return c
+      }
+      c.submissionId = desc.submissionId
+      queuedDriversState.persist(desc.submissionId, desc)
+      queuedDrivers += desc
+      c.success = true
+    }
+    c
+  }
+
+  def killDriver(submissionId: String): KillSubmissionResponse = {
+    val k = new KillSubmissionResponse
+    if (!ready) {
+      k.success = false
+      k.message = "Scheduler is not ready to take requests"
+      return k
+    }
+    k.submissionId = submissionId
+    stateLock.synchronized {
+      // We look for the requested driver in the following places:
+      // 1. Check if submission is running or launched.
+      // 2. Check if it's still queued.
+      // 3. Check if it's in the retry list.
+      // 4. Check if it has already completed.
+      if (launchedDrivers.contains(submissionId)) {
+        val task = launchedDrivers(submissionId)
+        mesosDriver.killTask(task.taskId)
+        k.success = true
+        k.message = "Killing running driver"
+      } else if (removeFromQueuedDrivers(submissionId)) {
+        k.success = true
+        k.message = "Removed driver while it's still pending"
+      } else if (removeFromPendingRetryDrivers(submissionId)) {
+        k.success = true
+        k.message = "Removed driver while it's being retried"
+      } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
+        k.success = false
+        k.message = "Driver already terminated"
+      } else {
+        k.success = false
+        k.message = "Cannot find driver"
+      }
+    }
+    k
+  }
+
+  def getDriverStatus(submissionId: String): SubmissionStatusResponse = {
+    val s = new SubmissionStatusResponse
+    if (!ready) {
+      s.success = false
+      s.message = "Scheduler is not ready to take requests"
+      return s
+    }
+    s.submissionId = submissionId
+    stateLock.synchronized {
+      if (queuedDrivers.exists(_.submissionId.equals(submissionId))) {
+        s.success = true
+        s.driverState = "QUEUED"
+      } else if (launchedDrivers.contains(submissionId)) {
+        s.success = true
+        s.driverState = "RUNNING"
+        launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString)
+      } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
+        s.success = true
+        s.driverState = "FINISHED"
+        finishedDrivers
+          .find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus
+          .foreach(state => s.message = state.toString)
+      } else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) {
+        val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId))
+          .get.retryState.get.lastFailureStatus
+        s.success = true
+        s.driverState = "RETRYING"
+        s.message = status.toString
+      } else {
+        s.success = false
+        s.driverState = "NOT_FOUND"
+      }
+    }
+    s
+  }
+
+  private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity
+
+  /**
+   * Recover scheduler state that is persisted.
+   * We still need to do task reconciliation to be up to date of the latest task states
+   * as it might have changed while the scheduler is failing over.
+   */
+  private def recoverState(): Unit = {
+    stateLock.synchronized {
+      launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state =>
+        launchedDrivers(state.taskId.getValue) = state
+        pendingRecover(state.taskId.getValue) = state.slaveId
+      }
+      queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d)
+      // There is potential timing issue where a queued driver might have been launched
+      // but the scheduler shuts down before the queued driver was able to be removed
+      // from the queue. We try to mitigate this issue by walking through all queued drivers
+      // and remove if they're already launched.
+      queuedDrivers
+        .filter(d => launchedDrivers.contains(d.submissionId))
+        .foreach(d => removeFromQueuedDrivers(d.submissionId))
+      pendingRetryDriversState.fetchAll[MesosDriverDescription]()
+        .foreach(s => pendingRetryDrivers += s)
+      // TODO: Consider storing finished drivers so we can show them on the UI after
+      // failover. For now we clear the history on each recovery.
+      finishedDrivers.clear()
+    }
+  }
+
+  /**
+   * Starts the cluster scheduler and wait until the scheduler is registered.
+   * This also marks the scheduler to be ready for requests.
+   */
+  def start(): Unit = {
+    // TODO: Implement leader election to make sure only one framework running in the cluster.
+    val fwId = schedulerState.fetch[String]("frameworkId")
+    val builder = FrameworkInfo.newBuilder()
+      .setUser(Utils.getCurrentUserName())
+      .setName(appName)
+      .setWebuiUrl(frameworkUrl)
+      .setCheckpoint(true)
+      .setFailoverTimeout(Integer.MAX_VALUE) // Setting to max so tasks keep running on crash
+    fwId.foreach { id =>
+      builder.setId(FrameworkID.newBuilder().setValue(id).build())
+      frameworkId = id
+    }
+    recoverState()
+    metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
+    metricsSystem.start()
+    startScheduler(master, MesosClusterScheduler.this, builder.build())
+    ready = true
+  }
+
+  def stop(): Unit = {
+    ready = false
+    metricsSystem.report()
+    metricsSystem.stop()
+    mesosDriver.stop(true)
+  }
+
+  override def registered(
+      driver: SchedulerDriver,
+      newFrameworkId: FrameworkID,
+      masterInfo: MasterInfo): Unit = {
+    logInfo("Registered as framework ID " + newFrameworkId.getValue)
+    if (newFrameworkId.getValue != frameworkId) {
+      frameworkId = newFrameworkId.getValue
+      schedulerState.persist("frameworkId", frameworkId)
+    }
+    markRegistered()
+
+    stateLock.synchronized {
+      this.masterInfo = Some(masterInfo)
+      if (!pendingRecover.isEmpty) {
+        // Start task reconciliation if we need to recover.
+        val statuses = pendingRecover.collect {
+          case (taskId, slaveId) =>
+            val newStatus = TaskStatus.newBuilder()
+              .setTaskId(TaskID.newBuilder().setValue(taskId).build())
+              .setSlaveId(slaveId)
+              .setState(MesosTaskState.TASK_STAGING)
+              .build()
+            launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus))
+              .getOrElse(newStatus)
+        }
+        // TODO: Page the status updates to avoid trying to reconcile
+        // a large amount of tasks at once.
+        driver.reconcileTasks(statuses)
+      }
+    }
+  }
+
+  private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
+    val appJar = CommandInfo.URI.newBuilder()
+      .setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
+    val builder = CommandInfo.newBuilder().addUris(appJar)
+    val entries =
+      (conf.getOption("spark.executor.extraLibraryPath").toList ++
+        desc.command.libraryPathEntries)
+    val prefixEnv = if (!entries.isEmpty) {
+      Utils.libraryPathEnvPrefix(entries)
+    } else {
+      ""
+    }
+    val envBuilder = Environment.newBuilder()
+    desc.command.environment.foreach { case (k, v) =>
+      envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v).build())
+    }
+    // Pass all spark properties to executor.
+    val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+    envBuilder.addVariables(
+      Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
+    val cmdOptions = generateCmdOption(desc)
+    val executorUri = desc.schedulerProperties.get("spark.executor.uri")
+      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+    val appArguments = desc.command.arguments.mkString(" ")
+    val cmd = if (executorUri.isDefined) {
+      builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
+      val folderBasename = executorUri.get.split('/').last.split('.').head
+      val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
+      val cmdJar = s"../${desc.jarUrl.split("/").last}"
+      s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
+    } else {
+      val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
+        .orElse(conf.getOption("spark.home"))
+        .orElse(Option(System.getenv("SPARK_HOME")))
+        .getOrElse {
+          throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+        }
+      val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
+      val cmdJar = desc.jarUrl.split("/").last
+      s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
+    }
+    builder.setValue(cmd)
+    builder.setEnvironment(envBuilder.build())
+    builder.build()
+  }
+
+  private def generateCmdOption(desc: MesosDriverDescription): Seq[String] = {
+    var options = Seq(
+      "--name", desc.schedulerProperties("spark.app.name"),
+      "--class", desc.command.mainClass,
+      "--master", s"mesos://${conf.get("spark.master")}",
+      "--driver-cores", desc.cores.toString,
+      "--driver-memory", s"${desc.mem}M")
+    desc.schedulerProperties.get("spark.executor.memory").map { v =>
+      options ++= Seq("--executor-memory", v)
+    }
+    desc.schedulerProperties.get("spark.cores.max").map { v =>
+      options ++= Seq("--total-executor-cores", v)
+    }
+    options
+  }
+
+  private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) {
+    override def toString(): String = {
+      s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem"
+    }
+  }
+
+  /**
+   * This method takes all the possible candidates and attempt to schedule them with Mesos offers.
+   * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
+   * logic on each task.
+   */
+  private def scheduleTasks(
+      candidates: Seq[MesosDriverDescription],
+      afterLaunchCallback: (String) => Boolean,
+      currentOffers: List[ResourceOffer],
+      tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
+    for (submission <- candidates) {
+      val driverCpu = submission.cores
+      val driverMem = submission.mem
+      logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
+      val offerOption = currentOffers.find { o =>
+        o.cpu >= driverCpu && o.mem >= driverMem
+      }
+      if (offerOption.isEmpty) {
+        logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
+          s"cpu: $driverCpu, mem: $driverMem")
+      } else {
+        val offer = offerOption.get
+        offer.cpu -= driverCpu
+        offer.mem -= driverMem
+        val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
+        val cpuResource = Resource.newBuilder()
+          .setName("cpus").setType(Value.Type.SCALAR)
+          .setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
+        val memResource = Resource.newBuilder()
+          .setName("mem").setType(Value.Type.SCALAR)
+          .setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
+        val commandInfo = buildDriverCommand(submission)
+        val appName = submission.schedulerProperties("spark.app.name")
+        val taskInfo = TaskInfo.newBuilder()
+          .setTaskId(taskId)
+          .setName(s"Driver for $appName")
+          .setSlaveId(offer.offer.getSlaveId)
+          .setCommand(commandInfo)
+          .addResources(cpuResource)
+          .addResources(memResource)
+          .build()
+        val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
+        queuedTasks += taskInfo
+        logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
+          submission.submissionId)
+        val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
+          None, new Date())
+        launchedDrivers(submission.submissionId) = newState
+        launchedDriversState.persist(submission.submissionId, newState)
+        afterLaunchCallback(submission.submissionId)
+      }
+    }
+  }
+
+  override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
+    val currentOffers = offers.map { o =>
+      new ResourceOffer(
+        o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
+    }.toList
+    logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
+    val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
+    val currentTime = new Date()
+
+    stateLock.synchronized {
+      // We first schedule all the supervised drivers that are ready to retry.
+      // This list will be empty if none of the drivers are marked as supervise.
+      val driversToRetry = pendingRetryDrivers.filter { d =>
+        d.retryState.get.nextRetry.before(currentTime)
+      }
+      scheduleTasks(
+        driversToRetry,
+        removeFromPendingRetryDrivers,
+        currentOffers,
+        tasks)
+      // Then we walk through the queued drivers and try to schedule them.
+      scheduleTasks(
+        queuedDrivers,
+        removeFromQueuedDrivers,
+        currentOffers,
+        tasks)
+    }
+    tasks.foreach { case (offerId, tasks) =>
+      driver.launchTasks(Collections.singleton(offerId), tasks)
+    }
+    offers
+      .filter(o => !tasks.keySet.contains(o.getId))
+      .foreach(o => driver.declineOffer(o.getId))
+  }
+
+  def getSchedulerState(): MesosClusterSchedulerState = {
+    def copyBuffer(
+        buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
+      val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
+      buffer.copyToBuffer(newBuffer)
+      newBuffer
+    }
+    stateLock.synchronized {
+      new MesosClusterSchedulerState(
+        frameworkId,
+        masterInfo.map(m => s"http://${m.getIp}:${m.getPort}"),
+        copyBuffer(queuedDrivers),
+        launchedDrivers.values.map(_.copy()).toList,
+        finishedDrivers.map(_.copy()).toList,
+        copyBuffer(pendingRetryDrivers))
+    }
+  }
+
+  override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {}
+  override def disconnected(driver: SchedulerDriver): Unit = {}
+  override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = {
+    logInfo(s"Framework re-registered with master ${masterInfo.getId}")
+  }
+  override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
+  override def error(driver: SchedulerDriver, error: String): Unit = {
+    logError("Error received: " + error)
+  }
+
+  /**
+   * Check if the task state is a recoverable state that we can relaunch the task.
+   * Task state like TASK_ERROR are not relaunchable state since it wasn't able
+   * to be validated by Mesos.
+   */
+  private def shouldRelaunch(state: MesosTaskState): Boolean = {
+    state == MesosTaskState.TASK_FAILED ||
+      state == MesosTaskState.TASK_KILLED ||
+      state == MesosTaskState.TASK_LOST
+  }
+
+  override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
+    val taskId = status.getTaskId.getValue
+    stateLock.synchronized {
+      if (launchedDrivers.contains(taskId)) {
+        if (status.getReason == Reason.REASON_RECONCILIATION &&
+          !pendingRecover.contains(taskId)) {
+          // Task has already received update and no longer requires reconciliation.
+          return
+        }
+        val state = launchedDrivers(taskId)
+        // Check if the driver is supervise enabled and can be relaunched.
+        if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
+          removeFromLaunchedDrivers(taskId)
+          val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
+          val (retries, waitTimeSec) = retryState
+            .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
+            .getOrElse{ (1, 1) }
+          val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
+
+          val newDriverDescription = state.driverDescription.copy(
+            retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
+          pendingRetryDrivers += newDriverDescription
+          pendingRetryDriversState.persist(taskId, newDriverDescription)
+        } else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
+          removeFromLaunchedDrivers(taskId)
+          if (finishedDrivers.size >= retainedDrivers) {
+            val toRemove = math.max(retainedDrivers / 10, 1)
+            finishedDrivers.trimStart(toRemove)
+          }
+          finishedDrivers += state
+        }
+        state.mesosTaskStatus = Option(status)
+      } else {
+        logError(s"Unable to find driver $taskId in status update")
+      }
+    }
+  }
+
+  override def frameworkMessage(
+      driver: SchedulerDriver,
+      executorId: ExecutorID,
+      slaveId: SlaveID,
+      message: Array[Byte]): Unit = {}
+
+  override def executorLost(
+      driver: SchedulerDriver,
+      executorId: ExecutorID,
+      slaveId: SlaveID,
+      status: Int): Unit = {}
+
+  private def removeFromQueuedDrivers(id: String): Boolean = {
+    val index = queuedDrivers.indexWhere(_.submissionId.equals(id))
+    if (index != -1) {
+      queuedDrivers.remove(index)
+      queuedDriversState.expunge(id)
+      true
+    } else {
+      false
+    }
+  }
+
+  private def removeFromLaunchedDrivers(id: String): Boolean = {
+    if (launchedDrivers.remove(id).isDefined) {
+      launchedDriversState.expunge(id)
+      true
+    } else {
+      false
+    }
+  }
+
+  private def removeFromPendingRetryDrivers(id: String): Boolean = {
+    val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id))
+    if (index != -1) {
+      pendingRetryDrivers.remove(index)
+      pendingRetryDriversState.expunge(id)
+      true
+    } else {
+      false
+    }
+  }
+
+  def getQueuedDriversSize: Int = queuedDrivers.size
+  def getLaunchedDriversSize: Int = launchedDrivers.size
+  def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
new file mode 100644
index 0000000..1fe9497
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterScheduler)
+  extends Source {
+  override def sourceName: String = "mesos_cluster"
+  override def metricRegistry: MetricRegistry = new MetricRegistry()
+
+  metricRegistry.register(MetricRegistry.name("waitingDrivers"), new Gauge[Int] {
+    override def getValue: Int = scheduler.getQueuedDriversSize
+  })
+
+  metricRegistry.register(MetricRegistry.name("launchedDrivers"), new Gauge[Int] {
+    override def getValue: Int = scheduler.getLaunchedDriversSize
+  })
+
+  metricRegistry.register(MetricRegistry.name("retryDrivers"), new Gauge[Int] {
+    override def getValue: Int = scheduler.getPendingRetryDriversSize
+  })
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d9d62b0..8346a24 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -18,23 +18,19 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
+import java.util.{ArrayList => JArrayList, Collections, List => JList}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
 
+import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState,
-  ExecutorInfo => MesosExecutorInfo, _}
-
+import org.apache.mesos.{Scheduler => MScheduler, _}
 import org.apache.spark.executor.MesosExecutorBackend
-import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkException, TaskState}
 
 /**
  * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
@@ -47,14 +43,7 @@ private[spark] class MesosSchedulerBackend(
     master: String)
   extends SchedulerBackend
   with MScheduler
-  with Logging {
-
-  // Lock used to wait for scheduler to be registered
-  var isRegistered = false
-  val registeredLock = new Object()
-
-  // Driver for talking to Mesos
-  var driver: SchedulerDriver = null
+  with MesosSchedulerUtils {
 
   // Which slave IDs we have executors on
   val slaveIdsWithExecutors = new HashSet[String]
@@ -73,26 +62,9 @@ private[spark] class MesosSchedulerBackend(
   @volatile var appId: String = _
 
   override def start() {
-    synchronized {
-      classLoader = Thread.currentThread.getContextClassLoader
-
-      new Thread("MesosSchedulerBackend driver") {
-        setDaemon(true)
-        override def run() {
-          val scheduler = MesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
-          try {
-            val ret = driver.run()
-            logInfo("driver.run() returned with code " + ret)
-          } catch {
-            case e: Exception => logError("driver.run() failed", e)
-          }
-        }
-      }.start()
-
-      waitForRegister()
-    }
+    val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
+    classLoader = Thread.currentThread.getContextClassLoader
+    startScheduler(master, MesosSchedulerBackend.this, fwInfo)
   }
 
   def createExecutorInfo(execId: String): MesosExecutorInfo = {
@@ -125,17 +97,19 @@ private[spark] class MesosSchedulerBackend(
     }
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
-    val uri = sc.conf.get("spark.executor.uri", null)
+    val uri = sc.conf.getOption("spark.executor.uri")
+      .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+
     val executorBackendName = classOf[MesosExecutorBackend].getName
-    if (uri == null) {
+    if (uri.isEmpty) {
       val executorPath = new File(executorSparkHome, "/bin/spark-class").getCanonicalPath
       command.setValue(s"$prefixEnv $executorPath $executorBackendName")
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
-      val basename = uri.split('/').last.split('.').head
+      val basename = uri.get.split('/').last.split('.').head
       command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
     }
     val cpus = Resource.newBuilder()
       .setName("cpus")
@@ -181,18 +155,7 @@ private[spark] class MesosSchedulerBackend(
     inClassLoader() {
       appId = frameworkId.getValue
       logInfo("Registered as framework ID " + appId)
-      registeredLock.synchronized {
-        isRegistered = true
-        registeredLock.notifyAll()
-      }
-    }
-  }
-
-  def waitForRegister() {
-    registeredLock.synchronized {
-      while (!isRegistered) {
-        registeredLock.wait()
-      }
+      markRegistered()
     }
   }
 
@@ -287,14 +250,6 @@ private[spark] class MesosSchedulerBackend(
     }
   }
 
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
-  def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
-    }
-    0
-  }
-
   /** Turn a Spark TaskDescription into a Mesos task */
   def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
     val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
@@ -339,13 +294,13 @@ private[spark] class MesosSchedulerBackend(
   }
 
   override def stop() {
-    if (driver != null) {
-      driver.stop()
+    if (mesosDriver != null) {
+      mesosDriver.stop()
     }
   }
 
   override def reviveOffers() {
-    driver.reviveOffers()
+    mesosDriver.reviveOffers()
   }
 
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
@@ -380,7 +335,7 @@ private[spark] class MesosSchedulerBackend(
   }
 
   override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
-    driver.killTask(
+    mesosDriver.killTask(
       TaskID.newBuilder()
         .setValue(taskId.toString).build()
     )

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
new file mode 100644
index 0000000..d11228f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.List
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.JavaConversions._
+
+import org.apache.mesos.Protos.{FrameworkInfo, Resource, Status}
+import org.apache.mesos.{MesosSchedulerDriver, Scheduler}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Shared trait for implementing a Mesos Scheduler. This holds common state and helper
+ * methods and Mesos scheduler will use.
+ */
+private[mesos] trait MesosSchedulerUtils extends Logging {
+  // Lock used to wait for scheduler to be registered
+  private final val registerLatch = new CountDownLatch(1)
+
+  // Driver for talking to Mesos
+  protected var mesosDriver: MesosSchedulerDriver = null
+
+  /**
+   * Starts the MesosSchedulerDriver with the provided information. This method returns
+   * only after the scheduler has registered with Mesos.
+   * @param masterUrl Mesos master connection URL
+   * @param scheduler Scheduler object
+   * @param fwInfo FrameworkInfo to pass to the Mesos master
+   */
+  def startScheduler(masterUrl: String, scheduler: Scheduler, fwInfo: FrameworkInfo): Unit = {
+    synchronized {
+      if (mesosDriver != null) {
+        registerLatch.await()
+        return
+      }
+
+      new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
+        setDaemon(true)
+
+        override def run() {
+          mesosDriver = new MesosSchedulerDriver(scheduler, fwInfo, masterUrl)
+          try {
+            val ret = mesosDriver.run()
+            logInfo("driver.run() returned with code " + ret)
+            if (ret.equals(Status.DRIVER_ABORTED)) {
+              System.exit(1)
+            }
+          } catch {
+            case e: Exception => {
+              logError("driver.run() failed", e)
+              System.exit(1)
+            }
+          }
+        }
+      }.start()
+
+      registerLatch.await()
+    }
+  }
+
+  /**
+   * Signal that the scheduler has registered with Mesos.
+   */
+  protected def markRegistered(): Unit = {
+    registerLatch.countDown()
+  }
+
+  /**
+   * Get the amount of resources for the specified type from the resource list
+   */
+  protected def getResource(res: List[Resource], name: String): Double = {
+    for (r <- res if r.getName == name) {
+      return r.getScalar.getValue
+    }
+    0.0
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 4561e5b..c4e6f06 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -231,7 +231,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
     val childArgsStr = childArgs.mkString(" ")
     if (useRest) {
       childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
-      mainClass should be ("org.apache.spark.deploy.rest.StandaloneRestClient")
+      mainClass should be ("org.apache.spark.deploy.rest.RestSubmissionClient")
     } else {
       childArgsStr should startWith ("--supervise --memory 4g --cores 5")
       childArgsStr should include regex "launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2"

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 8e09976..0a318a2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -39,9 +39,9 @@ import org.apache.spark.deploy.master.DriverState._
  * Tests for the REST application submission protocol used in standalone cluster mode.
  */
 class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
-  private val client = new StandaloneRestClient
+  private val client = new RestSubmissionClient
   private var actorSystem: Option[ActorSystem] = None
-  private var server: Option[StandaloneRestServer] = None
+  private var server: Option[RestSubmissionServer] = None
 
   override def afterEach() {
     actorSystem.foreach(_.shutdown())
@@ -89,7 +89,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     conf.set("spark.app.name", "dreamer")
     val appArgs = Array("one", "two", "six")
     // main method calls this
-    val response = StandaloneRestClient.run("app-resource", "main-class", appArgs, conf)
+    val response = RestSubmissionClient.run("app-resource", "main-class", appArgs, conf)
     val submitResponse = getSubmitResponse(response)
     assert(submitResponse.action === Utils.getFormattedClassName(submitResponse))
     assert(submitResponse.serverSparkVersion === SPARK_VERSION)
@@ -208,7 +208,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("good request paths") {
     val masterUrl = startSmartServer()
     val httpUrl = masterUrl.replace("spark://", "http://")
-    val v = StandaloneRestServer.PROTOCOL_VERSION
+    val v = RestSubmissionServer.PROTOCOL_VERSION
     val json = constructSubmitRequest(masterUrl).toJson
     val submitRequestPath = s"$httpUrl/$v/submissions/create"
     val killRequestPath = s"$httpUrl/$v/submissions/kill"
@@ -238,7 +238,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("good request paths, bad requests") {
     val masterUrl = startSmartServer()
     val httpUrl = masterUrl.replace("spark://", "http://")
-    val v = StandaloneRestServer.PROTOCOL_VERSION
+    val v = RestSubmissionServer.PROTOCOL_VERSION
     val submitRequestPath = s"$httpUrl/$v/submissions/create"
     val killRequestPath = s"$httpUrl/$v/submissions/kill"
     val statusRequestPath = s"$httpUrl/$v/submissions/status"
@@ -276,7 +276,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("bad request paths") {
     val masterUrl = startSmartServer()
     val httpUrl = masterUrl.replace("spark://", "http://")
-    val v = StandaloneRestServer.PROTOCOL_VERSION
+    val v = RestSubmissionServer.PROTOCOL_VERSION
     val (response1, code1) = sendHttpRequestWithResponse(httpUrl, "GET")
     val (response2, code2) = sendHttpRequestWithResponse(s"$httpUrl/", "GET")
     val (response3, code3) = sendHttpRequestWithResponse(s"$httpUrl/$v", "GET")
@@ -292,7 +292,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     assert(code5 === HttpServletResponse.SC_BAD_REQUEST)
     assert(code6 === HttpServletResponse.SC_BAD_REQUEST)
     assert(code7 === HttpServletResponse.SC_BAD_REQUEST)
-    assert(code8 === StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
+    assert(code8 === RestSubmissionServer.SC_UNKNOWN_PROTOCOL_VERSION)
     // all responses should be error responses
     val errorResponse1 = getErrorResponse(response1)
     val errorResponse2 = getErrorResponse(response2)
@@ -310,13 +310,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     assert(errorResponse5.highestProtocolVersion === null)
     assert(errorResponse6.highestProtocolVersion === null)
     assert(errorResponse7.highestProtocolVersion === null)
-    assert(errorResponse8.highestProtocolVersion === StandaloneRestServer.PROTOCOL_VERSION)
+    assert(errorResponse8.highestProtocolVersion === RestSubmissionServer.PROTOCOL_VERSION)
   }
 
   test("server returns unknown fields") {
     val masterUrl = startSmartServer()
     val httpUrl = masterUrl.replace("spark://", "http://")
-    val v = StandaloneRestServer.PROTOCOL_VERSION
+    val v = RestSubmissionServer.PROTOCOL_VERSION
     val submitRequestPath = s"$httpUrl/$v/submissions/create"
     val oldJson = constructSubmitRequest(masterUrl).toJson
     val oldFields = parse(oldJson).asInstanceOf[JObject].obj
@@ -340,7 +340,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("client handles faulty server") {
     val masterUrl = startFaultyServer()
     val httpUrl = masterUrl.replace("spark://", "http://")
-    val v = StandaloneRestServer.PROTOCOL_VERSION
+    val v = RestSubmissionServer.PROTOCOL_VERSION
     val submitRequestPath = s"$httpUrl/$v/submissions/create"
     val killRequestPath = s"$httpUrl/$v/submissions/kill/anything"
     val statusRequestPath = s"$httpUrl/$v/submissions/status/anything"
@@ -400,9 +400,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val fakeMasterRef = _actorSystem.actorOf(Props(makeFakeMaster))
     val _server =
       if (faulty) {
-        new FaultyStandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf)
+        new FaultyStandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077")
       } else {
-        new StandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf)
+        new StandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077")
       }
     val port = _server.start()
     // set these to clean them up after every test
@@ -563,20 +563,18 @@ private class SmarterMaster extends Actor {
 private class FaultyStandaloneRestServer(
     host: String,
     requestedPort: Int,
+    masterConf: SparkConf,
     masterActor: ActorRef,
-    masterUrl: String,
-    masterConf: SparkConf)
-  extends StandaloneRestServer(host, requestedPort, masterActor, masterUrl, masterConf) {
+    masterUrl: String)
+  extends RestSubmissionServer(host, requestedPort, masterConf) {
 
-  protected override val contextToServlet = Map[String, StandaloneRestServlet](
-    s"$baseContext/create/*" -> new MalformedSubmitServlet,
-    s"$baseContext/kill/*" -> new InvalidKillServlet,
-    s"$baseContext/status/*" -> new ExplodingStatusServlet,
-    "/*" -> new ErrorServlet
-  )
+  protected override val submitRequestServlet = new MalformedSubmitServlet
+  protected override val killRequestServlet = new InvalidKillServlet
+  protected override val statusRequestServlet = new ExplodingStatusServlet
 
   /** A faulty servlet that produces malformed responses. */
-  class MalformedSubmitServlet extends SubmitRequestServlet(masterActor, masterUrl, masterConf) {
+  class MalformedSubmitServlet
+    extends StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf) {
     protected override def sendResponse(
         responseMessage: SubmitRestProtocolResponse,
         responseServlet: HttpServletResponse): Unit = {
@@ -586,7 +584,7 @@ private class FaultyStandaloneRestServer(
   }
 
   /** A faulty servlet that produces invalid responses. */
-  class InvalidKillServlet extends KillRequestServlet(masterActor, masterConf) {
+  class InvalidKillServlet extends StandaloneKillRequestServlet(masterActor, masterConf) {
     protected override def handleKill(submissionId: String): KillSubmissionResponse = {
       val k = super.handleKill(submissionId)
       k.submissionId = null
@@ -595,7 +593,7 @@ private class FaultyStandaloneRestServer(
   }
 
   /** A faulty status servlet that explodes. */
-  class ExplodingStatusServlet extends StatusRequestServlet(masterActor, masterConf) {
+  class ExplodingStatusServlet extends StandaloneStatusRequestServlet(masterActor, masterConf) {
     private def explode: Int = 1 / 0
     protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
       val s = super.handleStatus(submissionId)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[3/3] spark git commit: [SPARK-5338] [MESOS] Add cluster mode support for Mesos

Posted by an...@apache.org.
[SPARK-5338] [MESOS] Add cluster mode support for Mesos

This patch adds the support for cluster mode to run on Mesos.
It introduces a new Mesos framework dedicated to launch new apps/drivers, and can be called with the spark-submit script and specifying --master flag to the cluster mode REST interface instead of Mesos master.

Example:
./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master mesos://10.0.0.206:8077 --executor-memory 1G --total-executor-cores 100 examples/target/spark-examples_2.10-1.3.0-SNAPSHOT.jar 30

Part of this patch is also to abstract the StandaloneRestServer so it can have different implementations of the REST endpoints.

Features of the cluster mode in this PR:
- Supports supervise mode where scheduler will keep trying to reschedule exited job.
- Adds a new UI for the cluster mode scheduler to see all the running jobs, finished jobs, and supervise jobs waiting to be retried
- Supports state persistence to ZK, so when the cluster scheduler fails over it can pick up all the queued and running jobs

Author: Timothy Chen <tn...@gmail.com>
Author: Luc Bourlier <lu...@typesafe.com>

Closes #5144 from tnachen/mesos_cluster_mode and squashes the following commits:

069e946 [Timothy Chen] Fix rebase.
e24b512 [Timothy Chen] Persist submitted driver.
390c491 [Timothy Chen] Fix zk conf key for mesos zk engine.
e324ac1 [Timothy Chen] Fix merge.
fd5259d [Timothy Chen] Address review comments.
1553230 [Timothy Chen] Address review comments.
c6c6b73 [Timothy Chen] Pass spark properties to mesos cluster tasks.
f7d8046 [Timothy Chen] Change app name to spark cluster.
17f93a2 [Timothy Chen] Fix head of line blocking in scheduling drivers.
6ff8e5c [Timothy Chen] Address comments and add logging.
df355cd [Timothy Chen] Add metrics to mesos cluster scheduler.
20f7284 [Timothy Chen] Address review comments
7252612 [Timothy Chen] Fix tests.
a46ad66 [Timothy Chen] Allow zk cli param override.
920fc4b [Timothy Chen] Fix scala style issues.
862b5b5 [Timothy Chen] Support asking driver status when it's retrying.
7f214c2 [Timothy Chen] Fix RetryState visibility
e0f33f7 [Timothy Chen] Add supervise support and persist retries.
371ce65 [Timothy Chen] Handle cluster mode recovery and state persistence.
3d4dfa1 [Luc Bourlier] Adds support to kill submissions
febfaba [Timothy Chen] Bound the finished drivers in memory
543a98d [Timothy Chen] Schedule multiple jobs
6887e5e [Timothy Chen] Support looking at SPARK_EXECUTOR_URI env variable in schedulers
8ec76bc [Timothy Chen] Fix Mesos dispatcher UI.
d57d77d [Timothy Chen] Add documentation
825afa0 [Luc Bourlier] Supports more spark-submit parameters
b8e7181 [Luc Bourlier] Adds a shutdown latch to keep the deamon running
0fa7780 [Luc Bourlier] Launch task through the mesos scheduler
5b7a12b [Timothy Chen] WIP: Making a cluster mode a mesos framework.
4b2f5ef [Timothy Chen] Specify user jar in command to be replaced with local.
e775001 [Timothy Chen] Support fetching remote uris in driver runner.
7179495 [Timothy Chen] Change Driver page output and add logging
880bc27 [Timothy Chen] Add Mesos Cluster UI to display driver results
9986731 [Timothy Chen] Kill drivers when shutdown
67cbc18 [Timothy Chen] Rename StandaloneRestClient to RestClient and add sbin scripts
e3facdd [Timothy Chen] Add Mesos Cluster dispatcher


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53befacc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53befacc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53befacc

Branch: refs/heads/master
Commit: 53befacced828bbac53c6e3a4976ec3f036bae9e
Parents: 8009810
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue Apr 28 13:31:08 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Apr 28 13:33:57 2015 -0700

----------------------------------------------------------------------
 .../spark/deploy/FaultToleranceTest.scala       |   2 +-
 .../apache/spark/deploy/SparkCuratorUtil.scala  |  66 ++
 .../org/apache/spark/deploy/SparkSubmit.scala   |  48 +-
 .../spark/deploy/SparkSubmitArguments.scala     |  11 +-
 .../org/apache/spark/deploy/master/Master.scala |   2 +-
 .../spark/deploy/master/SparkCuratorUtil.scala  |  64 --
 .../master/ZooKeeperLeaderElectionAgent.scala   |   1 +
 .../master/ZooKeeperPersistenceEngine.scala     |   1 +
 .../deploy/mesos/MesosClusterDispatcher.scala   | 116 ++++
 .../mesos/MesosClusterDispatcherArguments.scala | 101 +++
 .../deploy/mesos/MesosDriverDescription.scala   |  65 ++
 .../deploy/mesos/ui/MesosClusterPage.scala      | 114 ++++
 .../spark/deploy/mesos/ui/MesosClusterUI.scala  |  48 ++
 .../deploy/rest/RestSubmissionClient.scala      | 344 +++++++++++
 .../deploy/rest/RestSubmissionServer.scala      | 318 ++++++++++
 .../deploy/rest/StandaloneRestClient.scala      | 335 ----------
 .../deploy/rest/StandaloneRestServer.scala      | 344 ++---------
 .../deploy/rest/SubmitRestProtocolRequest.scala |   2 +-
 .../rest/SubmitRestProtocolResponse.scala       |   6 +-
 .../deploy/rest/mesos/MesosRestServer.scala     | 158 +++++
 .../mesos/CoarseMesosSchedulerBackend.scala     |  82 +--
 .../mesos/MesosClusterPersistenceEngine.scala   | 134 ++++
 .../cluster/mesos/MesosClusterScheduler.scala   | 608 +++++++++++++++++++
 .../mesos/MesosClusterSchedulerSource.scala     |  40 ++
 .../cluster/mesos/MesosSchedulerBackend.scala   |  85 +--
 .../cluster/mesos/MesosSchedulerUtils.scala     |  95 +++
 .../apache/spark/deploy/SparkSubmitSuite.scala  |   2 +-
 .../deploy/rest/StandaloneRestSubmitSuite.scala |  46 +-
 .../mesos/MesosClusterSchedulerSuite.scala      |  76 +++
 docs/running-on-mesos.md                        |  23 +-
 sbin/start-mesos-dispatcher.sh                  |  40 ++
 sbin/stop-mesos-dispatcher.sh                   |  27 +
 32 files changed, 2529 insertions(+), 875 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index a7c8927..c048b78 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -32,7 +32,7 @@ import org.json4s._
 import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{Logging, SparkConf, SparkContext}
-import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
+import org.apache.spark.deploy.master.RecoveryState
 import org.apache.spark.util.Utils
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
new file mode 100644
index 0000000..b8d3993
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.JavaConversions._
+
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.zookeeper.KeeperException
+
+import org.apache.spark.{Logging, SparkConf}
+
+private[spark] object SparkCuratorUtil extends Logging {
+
+  private val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
+  private val ZK_SESSION_TIMEOUT_MILLIS = 60000
+  private val RETRY_WAIT_MILLIS = 5000
+  private val MAX_RECONNECT_ATTEMPTS = 3
+
+  def newClient(
+      conf: SparkConf,
+      zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
+    val ZK_URL = conf.get(zkUrlConf)
+    val zk = CuratorFrameworkFactory.newClient(ZK_URL,
+      ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
+      new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))
+    zk.start()
+    zk
+  }
+
+  def mkdir(zk: CuratorFramework, path: String) {
+    if (zk.checkExists().forPath(path) == null) {
+      try {
+        zk.create().creatingParentsIfNeeded().forPath(path)
+      } catch {
+        case nodeExist: KeeperException.NodeExistsException =>
+          // do nothing, ignore node existing exception.
+        case e: Exception => throw e
+      }
+    }
+  }
+
+  def deleteRecursive(zk: CuratorFramework, path: String) {
+    if (zk.checkExists().forPath(path) != null) {
+      for (child <- zk.getChildren.forPath(path)) {
+        zk.delete().forPath(path + "/" + child)
+      }
+      zk.delete().forPath(path)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 296a076..f4f572e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -36,11 +36,11 @@ import org.apache.ivy.core.retrieve.RetrieveOptions
 import org.apache.ivy.core.settings.IvySettings
 import org.apache.ivy.plugins.matcher.GlobPatternMatcher
 import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
-
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.deploy.rest._
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
 
+
 /**
  * Whether to submit, kill, or request the status of an application.
  * The latter two operations are currently supported only for standalone cluster mode.
@@ -114,18 +114,20 @@ object SparkSubmit {
     }
   }
 
-  /** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
+  /**
+   * Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
+   */
   private def kill(args: SparkSubmitArguments): Unit = {
-    new StandaloneRestClient()
+    new RestSubmissionClient()
       .killSubmission(args.master, args.submissionToKill)
   }
 
   /**
    * Request the status of an existing submission using the REST protocol.
-   * Standalone cluster mode only.
+   * Standalone and Mesos cluster mode only.
    */
   private def requestStatus(args: SparkSubmitArguments): Unit = {
-    new StandaloneRestClient()
+    new RestSubmissionClient()
       .requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
   }
 
@@ -252,6 +254,7 @@ object SparkSubmit {
     }
 
     val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
+    val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
 
     // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
     // too for packages that include Python code
@@ -294,8 +297,9 @@ object SparkSubmit {
 
     // The following modes are not supported or applicable
     (clusterManager, deployMode) match {
-      case (MESOS, CLUSTER) =>
-        printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
+      case (MESOS, CLUSTER) if args.isPython =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for python " +
+          "applications on Mesos clusters.")
       case (STANDALONE, CLUSTER) if args.isPython =>
         printErrorAndExit("Cluster deploy mode is currently not supported for python " +
           "applications on standalone clusters.")
@@ -377,15 +381,6 @@ object SparkSubmit {
       OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
         sysProp = "spark.driver.extraLibraryPath"),
 
-      // Standalone cluster only
-      // Do not set CL arguments here because there are multiple possibilities for the main class
-      OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
-      OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
-      OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
-      OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
-      OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
-        sysProp = "spark.driver.supervise"),
-
       // Yarn client only
       OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
       OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
@@ -413,7 +408,15 @@ object SparkSubmit {
       OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
         sysProp = "spark.cores.max"),
       OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
-        sysProp = "spark.files")
+        sysProp = "spark.files"),
+      OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
+      OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
+        sysProp = "spark.driver.memory"),
+      OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
+        sysProp = "spark.driver.cores"),
+      OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
+        sysProp = "spark.driver.supervise"),
+      OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
     )
 
     // In client mode, launch the application main class directly
@@ -452,7 +455,7 @@ object SparkSubmit {
     // All Spark parameters are expected to be passed to the client through system properties.
     if (args.isStandaloneCluster) {
       if (args.useRest) {
-        childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
+        childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
         childArgs += (args.primaryResource, args.mainClass)
       } else {
         // In legacy standalone cluster mode, use Client as a wrapper around the user class
@@ -496,6 +499,15 @@ object SparkSubmit {
       }
     }
 
+    if (isMesosCluster) {
+      assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
+      childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
+      childArgs += (args.primaryResource, args.mainClass)
+      if (args.childArgs != null) {
+        childArgs ++= args.childArgs
+      }
+    }
+
     // Load any properties specified through --conf and the default properties file
     for ((k, v) <- args.sparkProperties) {
       sysProps.getOrElseUpdate(k, v)

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c896842..c621b8f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -241,8 +241,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
   }
 
   private def validateKillArguments(): Unit = {
-    if (!master.startsWith("spark://")) {
-      SparkSubmit.printErrorAndExit("Killing submissions is only supported in standalone mode!")
+    if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
+      SparkSubmit.printErrorAndExit(
+        "Killing submissions is only supported in standalone or Mesos mode!")
     }
     if (submissionToKill == null) {
       SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
@@ -250,9 +251,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
   }
 
   private def validateStatusRequestArguments(): Unit = {
-    if (!master.startsWith("spark://")) {
+    if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
       SparkSubmit.printErrorAndExit(
-        "Requesting submission statuses is only supported in standalone mode!")
+        "Requesting submission statuses is only supported in standalone or Mesos mode!")
     }
     if (submissionToRequestStatusFor == null) {
       SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
@@ -485,6 +486,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
         |
         | Spark standalone with cluster deploy mode only:
         |  --driver-cores NUM          Cores for driver (Default: 1).
+        |
+        | Spark standalone or Mesos with cluster deploy mode only:
         |  --supervise                 If given, restarts the driver on failure.
         |  --kill SUBMISSION_ID        If given, kills the driver specified.
         |  --status SUBMISSION_ID      If given, requests the status of the driver specified.

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index ff2eed6..1c21c17 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -130,7 +130,7 @@ private[master] class Master(
   private val restServer =
     if (restServerEnabled) {
       val port = conf.getInt("spark.master.rest.port", 6066)
-      Some(new StandaloneRestServer(host, port, self, masterUrl, conf))
+      Some(new StandaloneRestServer(host, port, conf, self, masterUrl))
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
deleted file mode 100644
index 5b22481..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.master
-
-import scala.collection.JavaConversions._
-
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.zookeeper.KeeperException
-
-import org.apache.spark.{Logging, SparkConf}
-
-private[deploy] object SparkCuratorUtil extends Logging {
-
-  private val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
-  private val ZK_SESSION_TIMEOUT_MILLIS = 60000
-  private val RETRY_WAIT_MILLIS = 5000
-  private val MAX_RECONNECT_ATTEMPTS = 3
-
-  def newClient(conf: SparkConf): CuratorFramework = {
-    val ZK_URL = conf.get("spark.deploy.zookeeper.url")
-    val zk = CuratorFrameworkFactory.newClient(ZK_URL,
-      ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
-      new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))
-    zk.start()
-    zk
-  }
-
-  def mkdir(zk: CuratorFramework, path: String) {
-    if (zk.checkExists().forPath(path) == null) {
-      try {
-        zk.create().creatingParentsIfNeeded().forPath(path)
-      } catch {
-        case nodeExist: KeeperException.NodeExistsException =>
-          // do nothing, ignore node existing exception.
-        case e: Exception => throw e
-      }
-    }
-  }
-
-  def deleteRecursive(zk: CuratorFramework, path: String) {
-    if (zk.checkExists().forPath(path) != null) {
-      for (child <- zk.getChildren.forPath(path)) {
-        zk.delete().forPath(path + "/" + child)
-      }
-      zk.delete().forPath(path)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 4823fd7..52758d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -23,6 +23,7 @@ import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.master.MasterMessages._
 import org.apache.curator.framework.CuratorFramework
 import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
+import org.apache.spark.deploy.SparkCuratorUtil
 
 private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
     conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index a285783..80db6d4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
 
 import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkCuratorUtil
 
 
 private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
new file mode 100644
index 0000000..5d4e5b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.concurrent.CountDownLatch
+
+import org.apache.spark.deploy.mesos.ui.MesosClusterUI
+import org.apache.spark.deploy.rest.mesos.MesosRestServer
+import org.apache.spark.scheduler.cluster.mesos._
+import org.apache.spark.util.SignalLogger
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+
+/*
+ * A dispatcher that is responsible for managing and launching drivers, and is intended to be
+ * used for Mesos cluster mode. The dispatcher is a long-running process started by the user in
+ * the cluster independently of Spark applications.
+ * It contains a [[MesosRestServer]] that listens for requests to submit drivers and a
+ * [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master
+ * for resources.
+ *
+ * A typical new driver lifecycle is the following:
+ * - Driver submitted via spark-submit talking to the [[MesosRestServer]]
+ * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]]
+ * - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue
+ *
+ * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable
+ * per driver launched.
+ * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
+ * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and
+ * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively.
+ */
+private[mesos] class MesosClusterDispatcher(
+    args: MesosClusterDispatcherArguments,
+    conf: SparkConf)
+  extends Logging {
+
+  private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
+  private val recoveryMode = conf.get("spark.mesos.deploy.recoveryMode", "NONE").toUpperCase()
+  logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
+
+  private val engineFactory = recoveryMode match {
+    case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
+    case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
+    case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode)
+  }
+
+  private val scheduler = new MesosClusterScheduler(engineFactory, conf)
+
+  private val server = new MesosRestServer(args.host, args.port, conf, scheduler)
+  private val webUi = new MesosClusterUI(
+    new SecurityManager(conf),
+    args.webUiPort,
+    conf,
+    publicAddress,
+    scheduler)
+
+  private val shutdownLatch = new CountDownLatch(1)
+
+  def start(): Unit = {
+    webUi.bind()
+    scheduler.frameworkUrl = webUi.activeWebUiUrl
+    scheduler.start()
+    server.start()
+  }
+
+  def awaitShutdown(): Unit = {
+    shutdownLatch.await()
+  }
+
+  def stop(): Unit = {
+    webUi.stop()
+    server.stop()
+    scheduler.stop()
+    shutdownLatch.countDown()
+  }
+}
+
+private[mesos] object MesosClusterDispatcher extends Logging {
+  def main(args: Array[String]) {
+    SignalLogger.register(log)
+    val conf = new SparkConf
+    val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
+    conf.setMaster(dispatcherArgs.masterUrl)
+    conf.setAppName(dispatcherArgs.name)
+    dispatcherArgs.zookeeperUrl.foreach { z =>
+      conf.set("spark.mesos.deploy.recoveryMode", "ZOOKEEPER")
+      conf.set("spark.mesos.deploy.zookeeper.url", z)
+    }
+    val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
+    dispatcher.start()
+    val shutdownHook = new Thread() {
+      override def run() {
+        logInfo("Shutdown hook is shutting down dispatcher")
+        dispatcher.stop()
+        dispatcher.awaitShutdown()
+      }
+    }
+    Runtime.getRuntime.addShutdownHook(shutdownHook)
+    dispatcher.awaitShutdown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
new file mode 100644
index 0000000..894cb78
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{IntParam, Utils}
+
+
+private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
+  var host = Utils.localHostName()
+  var port = 7077
+  var name = "Spark Cluster"
+  var webUiPort = 8081
+  var masterUrl: String = _
+  var zookeeperUrl: Option[String] = None
+  var propertiesFile: String = _
+
+  parse(args.toList)
+
+  propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
+
+  private def parse(args: List[String]): Unit = args match {
+    case ("--host" | "-h") :: value :: tail =>
+      Utils.checkHost(value, "Please use hostname " + value)
+      host = value
+      parse(tail)
+
+    case ("--port" | "-p") :: IntParam(value) :: tail =>
+      port = value
+      parse(tail)
+
+    case ("--webui-port" | "-p") :: IntParam(value) :: tail =>
+      webUiPort = value
+      parse(tail)
+
+    case ("--zk" | "-z") :: value :: tail =>
+      zookeeperUrl = Some(value)
+      parse(tail)
+
+    case ("--master" | "-m") :: value :: tail =>
+      if (!value.startsWith("mesos://")) {
+        System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
+        System.exit(1)
+      }
+      masterUrl = value.stripPrefix("mesos://")
+      parse(tail)
+
+    case ("--name") :: value :: tail =>
+      name = value
+      parse(tail)
+
+    case ("--properties-file") :: value :: tail =>
+      propertiesFile = value
+      parse(tail)
+
+    case ("--help") :: tail =>
+      printUsageAndExit(0)
+
+    case Nil => {
+      if (masterUrl == null) {
+        System.err.println("--master is required")
+        printUsageAndExit(1)
+      }
+    }
+
+    case _ =>
+      printUsageAndExit(1)
+  }
+
+  private def printUsageAndExit(exitCode: Int): Unit = {
+    System.err.println(
+      "Usage: MesosClusterDispatcher [options]\n" +
+        "\n" +
+        "Options:\n" +
+        "  -h HOST, --host HOST    Hostname to listen on\n" +
+        "  -p PORT, --port PORT    Port to listen on (default: 7077)\n" +
+        "  --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
+        "  --name NAME             Framework name to show in Mesos UI\n" +
+        "  -m --master MASTER      URI for connecting to Mesos master\n" +
+        "  -z --zk ZOOKEEPER       Comma delimited URLs for connecting to \n" +
+        "                          Zookeeper for persistence\n" +
+        "  --properties-file FILE  Path to a custom Spark properties file.\n" +
+        "                          Default is conf/spark-defaults.conf.")
+    System.exit(exitCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
new file mode 100644
index 0000000..1948226
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.Date
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
+
+/**
+ * Describes a Spark driver that is submitted from the
+ * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by
+ * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
+ * @param jarUrl URL to the application jar
+ * @param mem Amount of memory for the driver
+ * @param cores Number of cores for the driver
+ * @param supervise Supervise the driver for long running app
+ * @param command The command to launch the driver.
+ * @param schedulerProperties Extra properties to pass the Mesos scheduler
+ */
+private[spark] class MesosDriverDescription(
+    val name: String,
+    val jarUrl: String,
+    val mem: Int,
+    val cores: Double,
+    val supervise: Boolean,
+    val command: Command,
+    val schedulerProperties: Map[String, String],
+    val submissionId: String,
+    val submissionDate: Date,
+    val retryState: Option[MesosClusterRetryState] = None)
+  extends Serializable {
+
+  def copy(
+      name: String = name,
+      jarUrl: String = jarUrl,
+      mem: Int = mem,
+      cores: Double = cores,
+      supervise: Boolean = supervise,
+      command: Command = command,
+      schedulerProperties: Map[String, String] = schedulerProperties,
+      submissionId: String = submissionId,
+      submissionDate: Date = submissionDate,
+      retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
+    new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,
+      submissionId, submissionDate, retryState)
+  }
+
+  override def toString: String = s"MesosDriverDescription (${command.mainClass})"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
new file mode 100644
index 0000000..7b2005e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.mesos.Protos.TaskStatus
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val state = parent.scheduler.getSchedulerState()
+    val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources")
+    val driverHeaders = queuedHeaders ++
+      Seq("Start Date", "Mesos Slave ID", "State")
+    val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
+      Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
+    val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers)
+    val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers)
+    val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers)
+    val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers)
+    val content =
+      <p>Mesos Framework ID: {state.frameworkId}</p>
+      <div class="row-fluid">
+        <div class="span12">
+          <h4>Queued Drivers:</h4>
+          {queuedTable}
+          <h4>Launched Drivers:</h4>
+          {launchedTable}
+          <h4>Finished Drivers:</h4>
+          {finishedTable}
+          <h4>Supervise drivers waiting for retry:</h4>
+          {retryTable}
+        </div>
+      </div>;
+    UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster")
+  }
+
+  private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
+    <tr>
+      <td>{submission.submissionId}</td>
+      <td>{submission.submissionDate}</td>
+      <td>{submission.command.mainClass}</td>
+      <td>cpus: {submission.cores}, mem: {submission.mem}</td>
+    </tr>
+  }
+
+  private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
+    <tr>
+      <td>{state.driverDescription.submissionId}</td>
+      <td>{state.driverDescription.submissionDate}</td>
+      <td>{state.driverDescription.command.mainClass}</td>
+      <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
+      <td>{state.startDate}</td>
+      <td>{state.slaveId.getValue}</td>
+      <td>{stateString(state.mesosTaskStatus)}</td>
+    </tr>
+  }
+
+  private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
+    <tr>
+      <td>{submission.submissionId}</td>
+      <td>{submission.submissionDate}</td>
+      <td>{submission.command.mainClass}</td>
+      <td>{submission.retryState.get.lastFailureStatus}</td>
+      <td>{submission.retryState.get.nextRetry}</td>
+      <td>{submission.retryState.get.retries}</td>
+    </tr>
+  }
+
+  private def stateString(status: Option[TaskStatus]): String = {
+    if (status.isEmpty) {
+      return ""
+    }
+    val sb = new StringBuilder
+    val s = status.get
+    sb.append(s"State: ${s.getState}")
+    if (status.get.hasMessage) {
+      sb.append(s", Message: ${s.getMessage}")
+    }
+    if (status.get.hasHealthy) {
+      sb.append(s", Healthy: ${s.getHealthy}")
+    }
+    if (status.get.hasSource) {
+      sb.append(s", Source: ${s.getSource}")
+    }
+    if (status.get.hasReason) {
+      sb.append(s", Reason: ${s.getReason}")
+    }
+    if (status.get.hasTimestamp) {
+      sb.append(s", Time: ${s.getTimestamp}")
+    }
+    sb.toString()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
new file mode 100644
index 0000000..4865d46
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.ui.{SparkUI, WebUI}
+
+/**
+ * UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]]
+ */
+private[spark] class MesosClusterUI(
+    securityManager: SecurityManager,
+    port: Int,
+    conf: SparkConf,
+    dispatcherPublicAddress: String,
+    val scheduler: MesosClusterScheduler)
+  extends WebUI(securityManager, port, conf) {
+
+  initialize()
+
+  def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort
+
+  override def initialize() {
+    attachPage(new MesosClusterPage(this))
+    attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static"))
+  }
+}
+
+private object MesosClusterUI {
+  val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
new file mode 100644
index 0000000..307cebf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.io.{DataOutputStream, FileNotFoundException}
+import java.net.{HttpURLConnection, SocketException, URL}
+import javax.servlet.http.HttpServletResponse
+
+import scala.io.Source
+
+import com.fasterxml.jackson.core.JsonProcessingException
+import com.google.common.base.Charsets
+
+import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
+import org.apache.spark.util.Utils
+
+/**
+ * A client that submits applications to a [[RestSubmissionServer]].
+ *
+ * In protocol version v1, the REST URL takes the form http://[host:port]/v1/submissions/[action],
+ * where [action] can be one of create, kill, or status. Each type of request is represented in
+ * an HTTP message sent to the following prefixes:
+ *   (1) submit - POST to /submissions/create
+ *   (2) kill - POST /submissions/kill/[submissionId]
+ *   (3) status - GET /submissions/status/[submissionId]
+ *
+ * In the case of (1), parameters are posted in the HTTP body in the form of JSON fields.
+ * Otherwise, the URL fully specifies the intended action of the client.
+ *
+ * Since the protocol is expected to be stable across Spark versions, existing fields cannot be
+ * added or removed, though new optional fields can be added. In the rare event that forward or
+ * backward compatibility is broken, Spark must introduce a new protocol version (e.g. v2).
+ *
+ * The client and the server must communicate using the same version of the protocol. If there
+ * is a mismatch, the server will respond with the highest protocol version it supports. A future
+ * implementation of this client can use that information to retry using the version specified
+ * by the server.
+ */
+private[spark] class RestSubmissionClient extends Logging {
+  import RestSubmissionClient._
+
+  private val supportedMasterPrefixes = Seq("spark://", "mesos://")
+
+  /**
+   * Submit an application specified by the parameters in the provided request.
+   *
+   * If the submission was successful, poll the status of the submission and report
+   * it to the user. Otherwise, report the error message provided by the server.
+   */
+  def createSubmission(
+      master: String,
+      request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
+    logInfo(s"Submitting a request to launch an application in $master.")
+    validateMaster(master)
+    val url = getSubmitUrl(master)
+    val response = postJson(url, request.toJson)
+    response match {
+      case s: CreateSubmissionResponse =>
+        reportSubmissionStatus(master, s)
+        handleRestResponse(s)
+      case unexpected =>
+        handleUnexpectedRestResponse(unexpected)
+    }
+    response
+  }
+
+  /** Request that the server kill the specified submission. */
+  def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = {
+    logInfo(s"Submitting a request to kill submission $submissionId in $master.")
+    validateMaster(master)
+    val response = post(getKillUrl(master, submissionId))
+    response match {
+      case k: KillSubmissionResponse => handleRestResponse(k)
+      case unexpected => handleUnexpectedRestResponse(unexpected)
+    }
+    response
+  }
+
+  /** Request the status of a submission from the server. */
+  def requestSubmissionStatus(
+      master: String,
+      submissionId: String,
+      quiet: Boolean = false): SubmitRestProtocolResponse = {
+    logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
+    validateMaster(master)
+    val response = get(getStatusUrl(master, submissionId))
+    response match {
+      case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) }
+      case unexpected => handleUnexpectedRestResponse(unexpected)
+    }
+    response
+  }
+
+  /** Construct a message that captures the specified parameters for submitting an application. */
+  def constructSubmitRequest(
+      appResource: String,
+      mainClass: String,
+      appArgs: Array[String],
+      sparkProperties: Map[String, String],
+      environmentVariables: Map[String, String]): CreateSubmissionRequest = {
+    val message = new CreateSubmissionRequest
+    message.clientSparkVersion = sparkVersion
+    message.appResource = appResource
+    message.mainClass = mainClass
+    message.appArgs = appArgs
+    message.sparkProperties = sparkProperties
+    message.environmentVariables = environmentVariables
+    message.validate()
+    message
+  }
+
+  /** Send a GET request to the specified URL. */
+  private def get(url: URL): SubmitRestProtocolResponse = {
+    logDebug(s"Sending GET request to server at $url.")
+    val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+    conn.setRequestMethod("GET")
+    readResponse(conn)
+  }
+
+  /** Send a POST request to the specified URL. */
+  private def post(url: URL): SubmitRestProtocolResponse = {
+    logDebug(s"Sending POST request to server at $url.")
+    val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+    conn.setRequestMethod("POST")
+    readResponse(conn)
+  }
+
+  /** Send a POST request with the given JSON as the body to the specified URL. */
+  private def postJson(url: URL, json: String): SubmitRestProtocolResponse = {
+    logDebug(s"Sending POST request to server at $url:\n$json")
+    val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+    conn.setRequestMethod("POST")
+    conn.setRequestProperty("Content-Type", "application/json")
+    conn.setRequestProperty("charset", "utf-8")
+    conn.setDoOutput(true)
+    val out = new DataOutputStream(conn.getOutputStream)
+    Utils.tryWithSafeFinally {
+      out.write(json.getBytes(Charsets.UTF_8))
+    } {
+      out.close()
+    }
+    readResponse(conn)
+  }
+
+  /**
+   * Read the response from the server and return it as a validated [[SubmitRestProtocolResponse]].
+   * If the response represents an error, report the embedded message to the user.
+   * Exposed for testing.
+   */
+  private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
+    try {
+      val dataStream =
+        if (connection.getResponseCode == HttpServletResponse.SC_OK) {
+          connection.getInputStream
+        } else {
+          connection.getErrorStream
+        }
+      // If the server threw an exception while writing a response, it will not have a body
+      if (dataStream == null) {
+        throw new SubmitRestProtocolException("Server returned empty body")
+      }
+      val responseJson = Source.fromInputStream(dataStream).mkString
+      logDebug(s"Response from the server:\n$responseJson")
+      val response = SubmitRestProtocolMessage.fromJson(responseJson)
+      response.validate()
+      response match {
+        // If the response is an error, log the message
+        case error: ErrorResponse =>
+          logError(s"Server responded with error:\n${error.message}")
+          error
+        // Otherwise, simply return the response
+        case response: SubmitRestProtocolResponse => response
+        case unexpected =>
+          throw new SubmitRestProtocolException(
+            s"Message received from server was not a response:\n${unexpected.toJson}")
+      }
+    } catch {
+      case unreachable @ (_: FileNotFoundException | _: SocketException) =>
+        throw new SubmitRestConnectionException(
+          s"Unable to connect to server ${connection.getURL}", unreachable)
+      case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
+        throw new SubmitRestProtocolException(
+          "Malformed response received from server", malformed)
+    }
+  }
+
+  /** Return the REST URL for creating a new submission. */
+  private def getSubmitUrl(master: String): URL = {
+    val baseUrl = getBaseUrl(master)
+    new URL(s"$baseUrl/create")
+  }
+
+  /** Return the REST URL for killing an existing submission. */
+  private def getKillUrl(master: String, submissionId: String): URL = {
+    val baseUrl = getBaseUrl(master)
+    new URL(s"$baseUrl/kill/$submissionId")
+  }
+
+  /** Return the REST URL for requesting the status of an existing submission. */
+  private def getStatusUrl(master: String, submissionId: String): URL = {
+    val baseUrl = getBaseUrl(master)
+    new URL(s"$baseUrl/status/$submissionId")
+  }
+
+  /** Return the base URL for communicating with the server, including the protocol version. */
+  private def getBaseUrl(master: String): String = {
+    var masterUrl = master
+    supportedMasterPrefixes.foreach { prefix =>
+      if (master.startsWith(prefix)) {
+        masterUrl = master.stripPrefix(prefix)
+      }
+    }
+    masterUrl = masterUrl.stripSuffix("/")
+    s"http://$masterUrl/$PROTOCOL_VERSION/submissions"
+  }
+
+  /** Throw an exception if this is not standalone mode. */
+  private def validateMaster(master: String): Unit = {
+    val valid = supportedMasterPrefixes.exists { prefix => master.startsWith(prefix) }
+    if (!valid) {
+      throw new IllegalArgumentException(
+        "This REST client only supports master URLs that start with " +
+          "one of the following: " + supportedMasterPrefixes.mkString(","))
+    }
+  }
+
+  /** Report the status of a newly created submission. */
+  private def reportSubmissionStatus(
+      master: String,
+      submitResponse: CreateSubmissionResponse): Unit = {
+    if (submitResponse.success) {
+      val submissionId = submitResponse.submissionId
+      if (submissionId != null) {
+        logInfo(s"Submission successfully created as $submissionId. Polling submission state...")
+        pollSubmissionStatus(master, submissionId)
+      } else {
+        // should never happen
+        logError("Application successfully submitted, but submission ID was not provided!")
+      }
+    } else {
+      val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
+      logError(s"Application submission failed$failMessage")
+    }
+  }
+
+  /**
+   * Poll the status of the specified submission and log it.
+   * This retries up to a fixed number of times before giving up.
+   */
+  private def pollSubmissionStatus(master: String, submissionId: String): Unit = {
+    (1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
+      val response = requestSubmissionStatus(master, submissionId, quiet = true)
+      val statusResponse = response match {
+        case s: SubmissionStatusResponse => s
+        case _ => return // unexpected type, let upstream caller handle it
+      }
+      if (statusResponse.success) {
+        val driverState = Option(statusResponse.driverState)
+        val workerId = Option(statusResponse.workerId)
+        val workerHostPort = Option(statusResponse.workerHostPort)
+        val exception = Option(statusResponse.message)
+        // Log driver state, if present
+        driverState match {
+          case Some(state) => logInfo(s"State of driver $submissionId is now $state.")
+          case _ => logError(s"State of driver $submissionId was not found!")
+        }
+        // Log worker node, if present
+        (workerId, workerHostPort) match {
+          case (Some(id), Some(hp)) => logInfo(s"Driver is running on worker $id at $hp.")
+          case _ =>
+        }
+        // Log exception stack trace, if present
+        exception.foreach { e => logError(e) }
+        return
+      }
+      Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)
+    }
+    logError(s"Error: Master did not recognize driver $submissionId.")
+  }
+
+  /** Log the response sent by the server in the REST application submission protocol. */
+  private def handleRestResponse(response: SubmitRestProtocolResponse): Unit = {
+    logInfo(s"Server responded with ${response.messageType}:\n${response.toJson}")
+  }
+
+  /** Log an appropriate error if the response sent by the server is not of the expected type. */
+  private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
+    logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
+  }
+}
+
+private[spark] object RestSubmissionClient {
+  private val REPORT_DRIVER_STATUS_INTERVAL = 1000
+  private val REPORT_DRIVER_STATUS_MAX_TRIES = 10
+  val PROTOCOL_VERSION = "v1"
+
+  /**
+   * Submit an application, assuming Spark parameters are specified through the given config.
+   * This is abstracted to its own method for testing purposes.
+   */
+  def run(
+      appResource: String,
+      mainClass: String,
+      appArgs: Array[String],
+      conf: SparkConf,
+      env: Map[String, String] = sys.env): SubmitRestProtocolResponse = {
+    val master = conf.getOption("spark.master").getOrElse {
+      throw new IllegalArgumentException("'spark.master' must be set.")
+    }
+    val sparkProperties = conf.getAll.toMap
+    val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
+    val client = new RestSubmissionClient
+    val submitRequest = client.constructSubmitRequest(
+      appResource, mainClass, appArgs, sparkProperties, environmentVariables)
+    client.createSubmission(master, submitRequest)
+  }
+
+  def main(args: Array[String]): Unit = {
+    if (args.size < 2) {
+      sys.error("Usage: RestSubmissionClient [app resource] [main class] [app args*]")
+      sys.exit(1)
+    }
+    val appResource = args(0)
+    val mainClass = args(1)
+    val appArgs = args.slice(2, args.size)
+    val conf = new SparkConf
+    run(appResource, mainClass, appArgs, conf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
new file mode 100644
index 0000000..2e78d03
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest
+
+import java.net.InetSocketAddress
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
+
+import scala.io.Source
+import com.fasterxml.jackson.core.JsonProcessingException
+import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
+import org.eclipse.jetty.util.thread.QueuedThreadPool
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
+import org.apache.spark.util.Utils
+
+/**
+ * A server that responds to requests submitted by the [[RestSubmissionClient]].
+ *
+ * This server responds with different HTTP codes depending on the situation:
+ *   200 OK - Request was processed successfully
+ *   400 BAD REQUEST - Request was malformed, not successfully validated, or of unexpected type
+ *   468 UNKNOWN PROTOCOL VERSION - Request specified a protocol this server does not understand
+ *   500 INTERNAL SERVER ERROR - Server throws an exception internally while processing the request
+ *
+ * The server always includes a JSON representation of the relevant [[SubmitRestProtocolResponse]]
+ * in the HTTP body. If an error occurs, however, the server will include an [[ErrorResponse]]
+ * instead of the one expected by the client. If the construction of this error response itself
+ * fails, the response will consist of an empty body with a response code that indicates internal
+ * server error.
+ */
+private[spark] abstract class RestSubmissionServer(
+    val host: String,
+    val requestedPort: Int,
+    val masterConf: SparkConf) extends Logging {
+  protected val submitRequestServlet: SubmitRequestServlet
+  protected val killRequestServlet: KillRequestServlet
+  protected val statusRequestServlet: StatusRequestServlet
+
+  private var _server: Option[Server] = None
+
+  // A mapping from URL prefixes to servlets that serve them. Exposed for testing.
+  protected val baseContext = s"/${RestSubmissionServer.PROTOCOL_VERSION}/submissions"
+  protected lazy val contextToServlet = Map[String, RestServlet](
+    s"$baseContext/create/*" -> submitRequestServlet,
+    s"$baseContext/kill/*" -> killRequestServlet,
+    s"$baseContext/status/*" -> statusRequestServlet,
+    "/*" -> new ErrorServlet // default handler
+  )
+
+  /** Start the server and return the bound port. */
+  def start(): Int = {
+    val (server, boundPort) = Utils.startServiceOnPort[Server](requestedPort, doStart, masterConf)
+    _server = Some(server)
+    logInfo(s"Started REST server for submitting applications on port $boundPort")
+    boundPort
+  }
+
+  /**
+   * Map the servlets to their corresponding contexts and attach them to a server.
+   * Return a 2-tuple of the started server and the bound port.
+   */
+  private def doStart(startPort: Int): (Server, Int) = {
+    val server = new Server(new InetSocketAddress(host, startPort))
+    val threadPool = new QueuedThreadPool
+    threadPool.setDaemon(true)
+    server.setThreadPool(threadPool)
+    val mainHandler = new ServletContextHandler
+    mainHandler.setContextPath("/")
+    contextToServlet.foreach { case (prefix, servlet) =>
+      mainHandler.addServlet(new ServletHolder(servlet), prefix)
+    }
+    server.setHandler(mainHandler)
+    server.start()
+    val boundPort = server.getConnectors()(0).getLocalPort
+    (server, boundPort)
+  }
+
+  def stop(): Unit = {
+    _server.foreach(_.stop())
+  }
+}
+
+private[rest] object RestSubmissionServer {
+  val PROTOCOL_VERSION = RestSubmissionClient.PROTOCOL_VERSION
+  val SC_UNKNOWN_PROTOCOL_VERSION = 468
+}
+
+/**
+ * An abstract servlet for handling requests passed to the [[RestSubmissionServer]].
+ */
+private[rest] abstract class RestServlet extends HttpServlet with Logging {
+
+  /**
+   * Serialize the given response message to JSON and send it through the response servlet.
+   * This validates the response before sending it to ensure it is properly constructed.
+   */
+  protected def sendResponse(
+      responseMessage: SubmitRestProtocolResponse,
+      responseServlet: HttpServletResponse): Unit = {
+    val message = validateResponse(responseMessage, responseServlet)
+    responseServlet.setContentType("application/json")
+    responseServlet.setCharacterEncoding("utf-8")
+    responseServlet.getWriter.write(message.toJson)
+  }
+
+  /**
+   * Return any fields in the client request message that the server does not know about.
+   *
+   * The mechanism for this is to reconstruct the JSON on the server side and compare the
+   * diff between this JSON and the one generated on the client side. Any fields that are
+   * only in the client JSON are treated as unexpected.
+   */
+  protected def findUnknownFields(
+      requestJson: String,
+      requestMessage: SubmitRestProtocolMessage): Array[String] = {
+    val clientSideJson = parse(requestJson)
+    val serverSideJson = parse(requestMessage.toJson)
+    val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
+    unknown match {
+      case j: JObject => j.obj.map { case (k, _) => k }.toArray
+      case _ => Array.empty[String] // No difference
+    }
+  }
+
+  /** Return a human readable String representation of the exception. */
+  protected def formatException(e: Throwable): String = {
+    val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
+    s"$e\n$stackTraceString"
+  }
+
+  /** Construct an error message to signal the fact that an exception has been thrown. */
+  protected def handleError(message: String): ErrorResponse = {
+    val e = new ErrorResponse
+    e.serverSparkVersion = sparkVersion
+    e.message = message
+    e
+  }
+
+  /**
+   * Parse a submission ID from the relative path, assuming it is the first part of the path.
+   * For instance, we expect the path to take the form /[submission ID]/maybe/something/else.
+   * The returned submission ID cannot be empty. If the path is unexpected, return None.
+   */
+  protected def parseSubmissionId(path: String): Option[String] = {
+    if (path == null || path.isEmpty) {
+      None
+    } else {
+      path.stripPrefix("/").split("/").headOption.filter(_.nonEmpty)
+    }
+  }
+
+  /**
+   * Validate the response to ensure that it is correctly constructed.
+   *
+   * If it is, simply return the message as is. Otherwise, return an error response instead
+   * to propagate the exception back to the client and set the appropriate error code.
+   */
+  private def validateResponse(
+      responseMessage: SubmitRestProtocolResponse,
+      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+    try {
+      responseMessage.validate()
+      responseMessage
+    } catch {
+      case e: Exception =>
+        responseServlet.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
+        handleError("Internal server error: " + formatException(e))
+    }
+  }
+}
+
+/**
+ * A servlet for handling kill requests passed to the [[RestSubmissionServer]].
+ */
+private[rest] abstract class KillRequestServlet extends RestServlet {
+
+  /**
+   * If a submission ID is specified in the URL, have the Master kill the corresponding
+   * driver and return an appropriate response to the client. Otherwise, return error.
+   */
+  protected override def doPost(
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    val submissionId = parseSubmissionId(request.getPathInfo)
+    val responseMessage = submissionId.map(handleKill).getOrElse {
+      response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+      handleError("Submission ID is missing in kill request.")
+    }
+    sendResponse(responseMessage, response)
+  }
+
+  protected def handleKill(submissionId: String): KillSubmissionResponse
+}
+
+/**
+ * A servlet for handling status requests passed to the [[RestSubmissionServer]].
+ */
+private[rest] abstract class StatusRequestServlet extends RestServlet {
+
+  /**
+   * If a submission ID is specified in the URL, request the status of the corresponding
+   * driver from the Master and include it in the response. Otherwise, return error.
+   */
+  protected override def doGet(
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    val submissionId = parseSubmissionId(request.getPathInfo)
+    val responseMessage = submissionId.map(handleStatus).getOrElse {
+      response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+      handleError("Submission ID is missing in status request.")
+    }
+    sendResponse(responseMessage, response)
+  }
+
+  protected def handleStatus(submissionId: String): SubmissionStatusResponse
+}
+
+/**
+ * A servlet for handling submit requests passed to the [[RestSubmissionServer]].
+ */
+private[rest] abstract class SubmitRequestServlet extends RestServlet {
+
+  /**
+   * Submit an application to the Master with parameters specified in the request.
+   *
+   * The request is assumed to be a [[SubmitRestProtocolRequest]] in the form of JSON.
+   * If the request is successfully processed, return an appropriate response to the
+   * client indicating so. Otherwise, return error instead.
+   */
+  protected override def doPost(
+      requestServlet: HttpServletRequest,
+      responseServlet: HttpServletResponse): Unit = {
+    val responseMessage =
+      try {
+        val requestMessageJson = Source.fromInputStream(requestServlet.getInputStream).mkString
+        val requestMessage = SubmitRestProtocolMessage.fromJson(requestMessageJson)
+        // The response should have already been validated on the client.
+        // In case this is not true, validate it ourselves to avoid potential NPEs.
+        requestMessage.validate()
+        handleSubmit(requestMessageJson, requestMessage, responseServlet)
+      } catch {
+        // The client failed to provide a valid JSON, so this is not our fault
+        case e @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
+          responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+          handleError("Malformed request: " + formatException(e))
+      }
+    sendResponse(responseMessage, responseServlet)
+  }
+
+  protected def handleSubmit(
+      requestMessageJson: String,
+      requestMessage: SubmitRestProtocolMessage,
+      responseServlet: HttpServletResponse): SubmitRestProtocolResponse
+}
+
+/**
+ * A default servlet that handles error cases that are not captured by other servlets.
+ */
+private class ErrorServlet extends RestServlet {
+  private val serverVersion = RestSubmissionServer.PROTOCOL_VERSION
+
+  /** Service a faulty request by returning an appropriate error message to the client. */
+  protected override def service(
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    val path = request.getPathInfo
+    val parts = path.stripPrefix("/").split("/").filter(_.nonEmpty).toList
+    var versionMismatch = false
+    var msg =
+      parts match {
+        case Nil =>
+          // http://host:port/
+          "Missing protocol version."
+        case `serverVersion` :: Nil =>
+          // http://host:port/correct-version
+          "Missing the /submissions prefix."
+        case `serverVersion` :: "submissions" :: tail =>
+          // http://host:port/correct-version/submissions/*
+          "Missing an action: please specify one of /create, /kill, or /status."
+        case unknownVersion :: tail =>
+          // http://host:port/unknown-version/*
+          versionMismatch = true
+          s"Unknown protocol version '$unknownVersion'."
+        case _ =>
+          // never reached
+          s"Malformed path $path."
+      }
+    msg += s" Please submit requests through http://[host]:[port]/$serverVersion/submissions/..."
+    val error = handleError(msg)
+    // If there is a version mismatch, include the highest protocol version that
+    // this server supports in case the client wants to retry with our version
+    if (versionMismatch) {
+      error.highestProtocolVersion = serverVersion
+      response.setStatus(RestSubmissionServer.SC_UNKNOWN_PROTOCOL_VERSION)
+    } else {
+      response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+    }
+    sendResponse(error, response)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/53befacc/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
deleted file mode 100644
index b8fd406..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.rest
-
-import java.io.{DataOutputStream, FileNotFoundException}
-import java.net.{HttpURLConnection, SocketException, URL}
-import javax.servlet.http.HttpServletResponse
-
-import scala.io.Source
-
-import com.fasterxml.jackson.core.JsonProcessingException
-import com.google.common.base.Charsets
-
-import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
-import org.apache.spark.util.Utils
-
-/**
- * A client that submits applications to the standalone Master using a REST protocol.
- * This client is intended to communicate with the [[StandaloneRestServer]] and is
- * currently used for cluster mode only.
- *
- * In protocol version v1, the REST URL takes the form http://[host:port]/v1/submissions/[action],
- * where [action] can be one of create, kill, or status. Each type of request is represented in
- * an HTTP message sent to the following prefixes:
- *   (1) submit - POST to /submissions/create
- *   (2) kill - POST /submissions/kill/[submissionId]
- *   (3) status - GET /submissions/status/[submissionId]
- *
- * In the case of (1), parameters are posted in the HTTP body in the form of JSON fields.
- * Otherwise, the URL fully specifies the intended action of the client.
- *
- * Since the protocol is expected to be stable across Spark versions, existing fields cannot be
- * added or removed, though new optional fields can be added. In the rare event that forward or
- * backward compatibility is broken, Spark must introduce a new protocol version (e.g. v2).
- *
- * The client and the server must communicate using the same version of the protocol. If there
- * is a mismatch, the server will respond with the highest protocol version it supports. A future
- * implementation of this client can use that information to retry using the version specified
- * by the server.
- */
-private[deploy] class StandaloneRestClient extends Logging {
-  import StandaloneRestClient._
-
-  /**
-   * Submit an application specified by the parameters in the provided request.
-   *
-   * If the submission was successful, poll the status of the submission and report
-   * it to the user. Otherwise, report the error message provided by the server.
-   */
-  private[rest] def createSubmission(
-      master: String,
-      request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
-    logInfo(s"Submitting a request to launch an application in $master.")
-    validateMaster(master)
-    val url = getSubmitUrl(master)
-    val response = postJson(url, request.toJson)
-    response match {
-      case s: CreateSubmissionResponse =>
-        reportSubmissionStatus(master, s)
-        handleRestResponse(s)
-      case unexpected =>
-        handleUnexpectedRestResponse(unexpected)
-    }
-    response
-  }
-
-  /** Request that the server kill the specified submission. */
-  def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = {
-    logInfo(s"Submitting a request to kill submission $submissionId in $master.")
-    validateMaster(master)
-    val response = post(getKillUrl(master, submissionId))
-    response match {
-      case k: KillSubmissionResponse => handleRestResponse(k)
-      case unexpected => handleUnexpectedRestResponse(unexpected)
-    }
-    response
-  }
-
-  /** Request the status of a submission from the server. */
-  def requestSubmissionStatus(
-      master: String,
-      submissionId: String,
-      quiet: Boolean = false): SubmitRestProtocolResponse = {
-    logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
-    validateMaster(master)
-    val response = get(getStatusUrl(master, submissionId))
-    response match {
-      case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) }
-      case unexpected => handleUnexpectedRestResponse(unexpected)
-    }
-    response
-  }
-
-  /** Construct a message that captures the specified parameters for submitting an application. */
-  private[rest] def constructSubmitRequest(
-      appResource: String,
-      mainClass: String,
-      appArgs: Array[String],
-      sparkProperties: Map[String, String],
-      environmentVariables: Map[String, String]): CreateSubmissionRequest = {
-    val message = new CreateSubmissionRequest
-    message.clientSparkVersion = sparkVersion
-    message.appResource = appResource
-    message.mainClass = mainClass
-    message.appArgs = appArgs
-    message.sparkProperties = sparkProperties
-    message.environmentVariables = environmentVariables
-    message.validate()
-    message
-  }
-
-  /** Send a GET request to the specified URL. */
-  private def get(url: URL): SubmitRestProtocolResponse = {
-    logDebug(s"Sending GET request to server at $url.")
-    val conn = url.openConnection().asInstanceOf[HttpURLConnection]
-    conn.setRequestMethod("GET")
-    readResponse(conn)
-  }
-
-  /** Send a POST request to the specified URL. */
-  private def post(url: URL): SubmitRestProtocolResponse = {
-    logDebug(s"Sending POST request to server at $url.")
-    val conn = url.openConnection().asInstanceOf[HttpURLConnection]
-    conn.setRequestMethod("POST")
-    readResponse(conn)
-  }
-
-  /** Send a POST request with the given JSON as the body to the specified URL. */
-  private def postJson(url: URL, json: String): SubmitRestProtocolResponse = {
-    logDebug(s"Sending POST request to server at $url:\n$json")
-    val conn = url.openConnection().asInstanceOf[HttpURLConnection]
-    conn.setRequestMethod("POST")
-    conn.setRequestProperty("Content-Type", "application/json")
-    conn.setRequestProperty("charset", "utf-8")
-    conn.setDoOutput(true)
-    val out = new DataOutputStream(conn.getOutputStream)
-    Utils.tryWithSafeFinally {
-      out.write(json.getBytes(Charsets.UTF_8))
-    } {
-      out.close()
-    }
-    readResponse(conn)
-  }
-
-  /**
-   * Read the response from the server and return it as a validated [[SubmitRestProtocolResponse]].
-   * If the response represents an error, report the embedded message to the user.
-   * Exposed for testing.
-   */
-  private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
-    try {
-      val dataStream =
-        if (connection.getResponseCode == HttpServletResponse.SC_OK) {
-          connection.getInputStream
-        } else {
-          connection.getErrorStream
-        }
-      // If the server threw an exception while writing a response, it will not have a body
-      if (dataStream == null) {
-        throw new SubmitRestProtocolException("Server returned empty body")
-      }
-      val responseJson = Source.fromInputStream(dataStream).mkString
-      logDebug(s"Response from the server:\n$responseJson")
-      val response = SubmitRestProtocolMessage.fromJson(responseJson)
-      response.validate()
-      response match {
-        // If the response is an error, log the message
-        case error: ErrorResponse =>
-          logError(s"Server responded with error:\n${error.message}")
-          error
-        // Otherwise, simply return the response
-        case response: SubmitRestProtocolResponse => response
-        case unexpected =>
-          throw new SubmitRestProtocolException(
-            s"Message received from server was not a response:\n${unexpected.toJson}")
-      }
-    } catch {
-      case unreachable @ (_: FileNotFoundException | _: SocketException) =>
-        throw new SubmitRestConnectionException(
-          s"Unable to connect to server ${connection.getURL}", unreachable)
-      case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
-        throw new SubmitRestProtocolException(
-          "Malformed response received from server", malformed)
-    }
-  }
-
-  /** Return the REST URL for creating a new submission. */
-  private def getSubmitUrl(master: String): URL = {
-    val baseUrl = getBaseUrl(master)
-    new URL(s"$baseUrl/create")
-  }
-
-  /** Return the REST URL for killing an existing submission. */
-  private def getKillUrl(master: String, submissionId: String): URL = {
-    val baseUrl = getBaseUrl(master)
-    new URL(s"$baseUrl/kill/$submissionId")
-  }
-
-  /** Return the REST URL for requesting the status of an existing submission. */
-  private def getStatusUrl(master: String, submissionId: String): URL = {
-    val baseUrl = getBaseUrl(master)
-    new URL(s"$baseUrl/status/$submissionId")
-  }
-
-  /** Return the base URL for communicating with the server, including the protocol version. */
-  private def getBaseUrl(master: String): String = {
-    val masterUrl = master.stripPrefix("spark://").stripSuffix("/")
-    s"http://$masterUrl/$PROTOCOL_VERSION/submissions"
-  }
-
-  /** Throw an exception if this is not standalone mode. */
-  private def validateMaster(master: String): Unit = {
-    if (!master.startsWith("spark://")) {
-      throw new IllegalArgumentException("This REST client is only supported in standalone mode.")
-    }
-  }
-
-  /** Report the status of a newly created submission. */
-  private def reportSubmissionStatus(
-      master: String,
-      submitResponse: CreateSubmissionResponse): Unit = {
-    if (submitResponse.success) {
-      val submissionId = submitResponse.submissionId
-      if (submissionId != null) {
-        logInfo(s"Submission successfully created as $submissionId. Polling submission state...")
-        pollSubmissionStatus(master, submissionId)
-      } else {
-        // should never happen
-        logError("Application successfully submitted, but submission ID was not provided!")
-      }
-    } else {
-      val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
-      logError(s"Application submission failed$failMessage")
-    }
-  }
-
-  /**
-   * Poll the status of the specified submission and log it.
-   * This retries up to a fixed number of times before giving up.
-   */
-  private def pollSubmissionStatus(master: String, submissionId: String): Unit = {
-    (1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
-      val response = requestSubmissionStatus(master, submissionId, quiet = true)
-      val statusResponse = response match {
-        case s: SubmissionStatusResponse => s
-        case _ => return // unexpected type, let upstream caller handle it
-      }
-      if (statusResponse.success) {
-        val driverState = Option(statusResponse.driverState)
-        val workerId = Option(statusResponse.workerId)
-        val workerHostPort = Option(statusResponse.workerHostPort)
-        val exception = Option(statusResponse.message)
-        // Log driver state, if present
-        driverState match {
-          case Some(state) => logInfo(s"State of driver $submissionId is now $state.")
-          case _ => logError(s"State of driver $submissionId was not found!")
-        }
-        // Log worker node, if present
-        (workerId, workerHostPort) match {
-          case (Some(id), Some(hp)) => logInfo(s"Driver is running on worker $id at $hp.")
-          case _ =>
-        }
-        // Log exception stack trace, if present
-        exception.foreach { e => logError(e) }
-        return
-      }
-      Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)
-    }
-    logError(s"Error: Master did not recognize driver $submissionId.")
-  }
-
-  /** Log the response sent by the server in the REST application submission protocol. */
-  private def handleRestResponse(response: SubmitRestProtocolResponse): Unit = {
-    logInfo(s"Server responded with ${response.messageType}:\n${response.toJson}")
-  }
-
-  /** Log an appropriate error if the response sent by the server is not of the expected type. */
-  private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
-    logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
-  }
-}
-
-private[rest] object StandaloneRestClient {
-  private val REPORT_DRIVER_STATUS_INTERVAL = 1000
-  private val REPORT_DRIVER_STATUS_MAX_TRIES = 10
-  val PROTOCOL_VERSION = "v1"
-
-  /**
-   * Submit an application, assuming Spark parameters are specified through the given config.
-   * This is abstracted to its own method for testing purposes.
-   */
-  def run(
-      appResource: String,
-      mainClass: String,
-      appArgs: Array[String],
-      conf: SparkConf,
-      env: Map[String, String] = sys.env): SubmitRestProtocolResponse = {
-    val master = conf.getOption("spark.master").getOrElse {
-      throw new IllegalArgumentException("'spark.master' must be set.")
-    }
-    val sparkProperties = conf.getAll.toMap
-    val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
-    val client = new StandaloneRestClient
-    val submitRequest = client.constructSubmitRequest(
-      appResource, mainClass, appArgs, sparkProperties, environmentVariables)
-    client.createSubmission(master, submitRequest)
-  }
-
-  def main(args: Array[String]): Unit = {
-    if (args.size < 2) {
-      sys.error("Usage: StandaloneRestClient [app resource] [main class] [app args*]")
-      sys.exit(1)
-    }
-    val appResource = args(0)
-    val mainClass = args(1)
-    val appArgs = args.slice(2, args.size)
-    val conf = new SparkConf
-    run(appResource, mainClass, appArgs, conf)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org