You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:09 UTC

[04/37] git commit: Initial cut at driver submission.

Initial cut at driver submission.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6a4acc4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6a4acc4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6a4acc4c

Branch: refs/heads/master
Commit: 6a4acc4c2d5c510cc76049dd8727cec76a2173e8
Parents: 1070b56
Author: Patrick Wendell <pw...@gmail.com>
Authored: Sat Dec 21 21:08:13 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Dec 25 01:19:01 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala |  43 ++++-
 .../apache/spark/deploy/DriverDescription.scala |  27 +++
 .../apache/spark/deploy/client/AppClient.scala  |   2 +-
 .../spark/deploy/client/DriverClient.scala      | 129 ++++++++++++++
 .../apache/spark/deploy/master/DriverInfo.scala |  38 ++++
 .../spark/deploy/master/DriverState.scala       |  34 ++++
 .../master/FileSystemPersistenceEngine.scala    |  15 +-
 .../org/apache/spark/deploy/master/Master.scala | 152 ++++++++++++++--
 .../spark/deploy/master/PersistenceEngine.scala |  11 +-
 .../apache/spark/deploy/master/WorkerInfo.scala |  14 ++
 .../master/ZooKeeperPersistenceEngine.scala     |  14 +-
 .../spark/deploy/master/ui/IndexPage.scala      |  42 ++++-
 .../spark/deploy/worker/DriverRunner.scala      | 178 +++++++++++++++++++
 .../org/apache/spark/deploy/worker/Worker.scala |  60 +++++--
 .../spark/deploy/worker/ui/IndexPage.scala      |  36 +++-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |  39 ++--
 16 files changed, 781 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 2753317..6743526 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -20,10 +20,11 @@ package org.apache.spark.deploy
 import scala.collection.immutable.List
 
 import org.apache.spark.deploy.ExecutorState.ExecutorState
-import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
+import org.apache.spark.deploy.master.{DriverInfo, WorkerInfo, ApplicationInfo}
 import org.apache.spark.deploy.master.RecoveryState.MasterState
-import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
 import org.apache.spark.util.Utils
+import org.apache.spark.deploy.master.DriverState.DriverState
 
 
 private[deploy] sealed trait DeployMessage extends Serializable
@@ -54,7 +55,14 @@ private[deploy] object DeployMessages {
       exitStatus: Option[Int])
     extends DeployMessage
 
-  case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription])
+  case class DriverStateChanged(
+      driverId: String,
+      state: DriverState,
+      exception: Option[Exception])
+    extends DeployMessage
+
+  case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
+     driverIds: Seq[String])
 
   case class Heartbeat(workerId: String) extends DeployMessage
 
@@ -76,14 +84,19 @@ private[deploy] object DeployMessages {
       sparkHome: String)
     extends DeployMessage
 
-  // Client to Master
+  case class LaunchDriver(driverId: String, jarUrl: String, mainClass: String, mem: Int)
+    extends DeployMessage
+
+  case class KillDriver(driverId: String) extends DeployMessage
+
+  // AppClient to Master
 
   case class RegisterApplication(appDescription: ApplicationDescription)
     extends DeployMessage
 
   case class MasterChangeAcknowledged(appId: String)
 
-  // Master to Client
+  // Master to AppClient
 
   case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage
 
@@ -97,11 +110,21 @@ private[deploy] object DeployMessages {
 
   case class ApplicationRemoved(message: String)
 
-  // Internal message in Client
+  // DriverClient <-> Master
+
+  case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
+
+  case class SubmitDriverResponse(success: Boolean, message: String) extends DeployMessage
+
+  case class RequestKillDriver(driverId: String) extends DeployMessage
+
+  case class KillDriverResponse(success: Boolean, message: String) extends DeployMessage
+
+  // Internal message in AppClient
 
-  case object StopClient
+  case object StopAppClient
 
-  // Master to Worker & Client
+  // Master to Worker & AppClient
 
   case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
 
@@ -112,6 +135,7 @@ private[deploy] object DeployMessages {
   // Master to MasterWebUI
 
   case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
+    activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
     activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
     status: MasterState) {
 
@@ -128,7 +152,8 @@ private[deploy] object DeployMessages {
   // Worker to WorkerWebUI
 
   case class WorkerStateResponse(host: String, port: Int, workerId: String,
-    executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
+    executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner],
+    drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String,
     cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
 
     Utils.checkHost(host, "Required hostname")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
new file mode 100644
index 0000000..52f6b1b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+private[spark] class DriverDescription(
+    val jarUrl: String,
+    val mainClass: String,
+    val mem: Integer) // TODO: Should this be Long?
+  extends Serializable {
+
+  override def toString: String = s"DriverDescription ($mainClass)"
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index c5a0d1f..737ba09 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -147,7 +147,7 @@ private[spark] class AppClient(
         logWarning(s"Connection to $address failed; waiting for master to reconnect...")
         markDisconnected()
 
-      case StopClient =>
+      case StopAppClient =>
         markDead()
         sender ! true
         context.stop(self)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
new file mode 100644
index 0000000..482bafd
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import akka.actor._
+import akka.remote.{RemotingLifecycleEvent}
+
+import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.deploy.{DeployMessage, DriverDescription}
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.{MasterArguments, Master}
+import akka.pattern.ask
+
+import org.apache.spark.util.{Utils, AkkaUtils}
+import scala.concurrent.duration.{FiniteDuration, Duration}
+import java.util.concurrent.TimeUnit
+import akka.util.Timeout
+import scala.concurrent.Await
+import akka.actor.Actor.emptyBehavior
+
+/**
+ * Parent class for actors that to send a single message to the standalone master and then die.
+ */
+private[spark] abstract class SingleMessageClient(
+    actorSystem: ActorSystem, master: String, message: DeployMessage)
+  extends Logging {
+
+  // Concrete child classes must implement
+  def handleResponse(response: Any)
+
+  var actor: ActorRef = actorSystem.actorOf(Props(new DriverActor()))
+
+  class DriverActor extends Actor with Logging {
+    override def preStart() {
+      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+      logInfo("Sending message to master " + master + "...")
+      val masterActor = context.actorSelection(Master.toAkkaUrl(master))
+      val timeoutDuration: FiniteDuration = Duration.create(
+        System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
+      val submitFuture = masterActor.ask(message)(timeoutDuration)
+      handleResponse(Await.result(submitFuture, timeoutDuration))
+      actorSystem.stop(actor)
+      actorSystem.shutdown()
+    }
+
+    override def receive = emptyBehavior
+  }
+}
+
+/**
+ * Submits a driver to the master.
+ */
+private[spark] class SubmissionClient(actorSystem: ActorSystem, master: String,
+    driverDescription: DriverDescription)
+    extends SingleMessageClient(actorSystem, master, RequestSubmitDriver(driverDescription)) {
+
+  override def handleResponse(response: Any) {
+    val resp = response.asInstanceOf[SubmitDriverResponse]
+    if (!resp.success) {
+      logError(s"Error submitting driver to $master")
+      logError(resp.message)
+    }
+  }
+}
+
+/**
+ * Terminates a client at the master.
+ */
+private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, driverId: String)
+    extends SingleMessageClient(actorSystem, master, RequestKillDriver(driverId)) {
+
+  override def handleResponse(response: Any) {
+    val resp = response.asInstanceOf[KillDriverResponse]
+    if (!resp.success) {
+      logError(s"Error terminating $driverId at $master")
+      logError(resp.message)
+    }
+  }
+}
+
+/**
+ * Callable utility for starting and terminating drivers inside of the standalone scheduler.
+ */
+object DriverClient {
+
+  def main(args: Array[String]) {
+    if (args.size < 3) {
+      println("usage: DriverClient launch <active-master> <jar-url> <main-class>")
+      println("usage: DriverClient kill <active-master> <driver-id>")
+      System.exit(-1)
+    }
+
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+      "driverSubmission", Utils.localHostName(), 0)
+
+    // TODO Should be configurable
+    val mem = 512
+
+    args(0) match {
+      case "launch" =>
+        val master = args(1)
+        val jarUrl = args(2)
+        val mainClass = args(3)
+        val driverDescription = new DriverDescription(jarUrl, mainClass, mem)
+        val client = new SubmissionClient(actorSystem, master, driverDescription)
+
+      case "kill" =>
+        val master = args(1)
+        val driverId = args(2)
+        val client = new TerminationClient(actorSystem, master, driverId)
+    }
+    actorSystem.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
new file mode 100644
index 0000000..69d150a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.deploy.{DriverDescription, ApplicationDescription}
+import java.util.Date
+import akka.actor.ActorRef
+import scala.collection.mutable
+
+private[spark] class DriverInfo(
+    val startTime: Long,
+    val id: String,
+    val desc: DriverDescription,
+    val submitDate: Date)
+  extends Serializable {
+
+  @transient var state: DriverState.Value = DriverState.SUBMITTED
+  /* If we fail when launching the driver, the exception is stored here. */
+  @transient var exception: Option[Exception] = None
+  /* Most recent worker assigned to this driver */
+  @transient var worker: Option[WorkerInfo] = None
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
new file mode 100644
index 0000000..230dab1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+private[spark] object DriverState extends Enumeration {
+
+  type DriverState = Value
+
+  // SUBMITTED: Submitted but not yet scheduled on a worker
+  // RUNNING: Has been allocated to a worker to run
+  // FINISHED: Previously ran and exited cleanly
+  // RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again
+  // UNKNOWN: The state of the driver is temporarily not known due to master failure recovery
+  // KILLED: A user manually killed this driver
+  // FAILED: Unable to run due to an unrecoverable error (e.g. missing jar file)
+  val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED = Value
+
+  val MAX_NUM_RETRY = 10
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 043945a..44a046b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -47,6 +47,15 @@ private[spark] class FileSystemPersistenceEngine(
     new File(dir + File.separator + "app_" + app.id).delete()
   }
 
+  override def addDriver(driver: DriverInfo) {
+    val driverFile = new File(dir + File.separator + "driver_" + driver.id)
+    serializeIntoFile(driverFile, driver)
+  }
+
+  override def removeDriver(driver: DriverInfo) {
+    new File(dir + File.separator + "driver_" + driver.id).delete()
+  }
+
   override def addWorker(worker: WorkerInfo) {
     val workerFile = new File(dir + File.separator + "worker_" + worker.id)
     serializeIntoFile(workerFile, worker)
@@ -56,13 +65,15 @@ private[spark] class FileSystemPersistenceEngine(
     new File(dir + File.separator + "worker_" + worker.id).delete()
   }
 
-  override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+  override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
     val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
     val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
     val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+    val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_"))
+    val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
     val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
     val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
-    (apps, workers)
+    (apps, drivers, workers)
   }
 
   private def serializeIntoFile(file: File, value: AnyRef) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index eebd079..76af332 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -30,7 +30,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 import akka.serialization.SerializationExtension
 
 import org.apache.spark.{Logging, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
+import org.apache.spark.deploy.{DriverDescription, ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.MasterMessages._
 import org.apache.spark.deploy.master.ui.MasterWebUI
@@ -47,7 +47,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
   val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
 
-  var nextAppNumber = 0
   val workers = new HashSet[WorkerInfo]
   val idToWorker = new HashMap[String, WorkerInfo]
   val actorToWorker = new HashMap[ActorRef, WorkerInfo]
@@ -57,9 +56,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   val idToApp = new HashMap[String, ApplicationInfo]
   val actorToApp = new HashMap[ActorRef, ApplicationInfo]
   val addressToApp = new HashMap[Address, ApplicationInfo]
-
   val waitingApps = new ArrayBuffer[ApplicationInfo]
   val completedApps = new ArrayBuffer[ApplicationInfo]
+  var nextAppNumber = 0
+
+  val drivers = new HashSet[DriverInfo]
+  val completedDrivers = new ArrayBuffer[DriverInfo]
+  val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling
+  var nextDriverNumber = 0
 
   Utils.checkHost(host, "Expected hostname")
 
@@ -134,14 +138,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   override def receive = {
     case ElectedLeader => {
-      val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
-      state = if (storedApps.isEmpty && storedWorkers.isEmpty)
+      val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
+      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty)
         RecoveryState.ALIVE
       else
         RecoveryState.RECOVERING
       logInfo("I have been elected leader! New state: " + state)
       if (state == RecoveryState.RECOVERING) {
-        beginRecovery(storedApps, storedWorkers)
+        beginRecovery(storedApps, storedDrivers, storedWorkers)
         context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
       }
     }
@@ -168,6 +172,52 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       }
     }
 
+    case RequestSubmitDriver(description) => {
+      if (state == RecoveryState.STANDBY) {
+        sender ! SubmitDriverResponse(false, "Standby master cannot accept driver submission")
+      } else {
+        logInfo("Driver submitted " + description.mainClass)
+        val driver = createDriver(description)
+        persistenceEngine.addDriver(driver)
+        waitingDrivers += driver
+        drivers.add(driver)
+        schedule()
+
+        // TODO: It might be good to instead have the submission client poll the master to determine
+        //       the current status of the driver. Since we may already want to expose this.
+
+        sender ! SubmitDriverResponse(true, "Driver successfully submitted")
+      }
+    }
+
+    case RequestKillDriver(driverId) => {
+      if (state == RecoveryState.STANDBY) {
+        sender ! KillDriverResponse(false, "Standby master cannot kill drivers")
+      } else {
+        logInfo("Asked to kill driver " + driverId)
+        val driver = drivers.find(_.id == driverId)
+        driver match {
+          case Some(d) =>
+            if (waitingDrivers.contains(d)) { waitingDrivers -= d }
+            else {
+              // We just notify the worker to kill the driver here. The final bookkeeping occurs
+              // on the return path when the worker submits a state change back to the master
+              // to notify it that the driver was successfully killed.
+              d.worker.foreach { w =>
+                w.actor ! KillDriver(driverId)
+              }
+            }
+            val msg = s"Kill request for $driverId submitted"
+            logInfo(msg)
+            sender ! KillDriverResponse(true, msg)
+          case None =>
+            val msg = s"Could not find running driver $driverId"
+            logWarning(msg)
+            sender ! KillDriverResponse(false, msg)
+        }
+      }
+    }
+
     case RegisterApplication(description) => {
       if (state == RecoveryState.STANDBY) {
         // ignore, don't send response
@@ -210,6 +260,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       }
     }
 
+    case DriverStateChanged(driverId, state, exception) => {
+      if (!(state == DriverState.FAILED || state == DriverState.FINISHED ||
+          state == DriverState.KILLED)) {
+        throw new Exception(s"Received unexpected state update for driver $driverId: $state")
+      }
+      drivers.find(_.id == driverId) match {
+        case Some(driver) => {
+          drivers -= driver
+          completedDrivers += driver
+          persistenceEngine.removeDriver(driver)
+          driver.state = state
+          driver.exception = exception
+          driver.worker.foreach(w => w.removeDriver(driver))
+        }
+        case None =>
+          logWarning(s"Got driver update for unknown driver $driverId")
+      }
+    }
+
     case Heartbeat(workerId) => {
       idToWorker.get(workerId) match {
         case Some(workerInfo) =>
@@ -231,7 +300,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       if (canCompleteRecovery) { completeRecovery() }
     }
 
-    case WorkerSchedulerStateResponse(workerId, executors) => {
+    case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
       idToWorker.get(workerId) match {
         case Some(worker) =>
           logInfo("Worker has been re-registered: " + workerId)
@@ -244,6 +313,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
             worker.addExecutor(execInfo)
             execInfo.copyState(exec)
           }
+
+          for (driverId <- driverIds) {
+            drivers.find(_.id == driverId).foreach { driver =>
+              driver.worker = Some(worker)
+              driver.state = DriverState.RUNNING
+              worker.drivers(driverId) = driver
+            }
+          }
         case None =>
           logWarning("Scheduler state from unknown worker: " + workerId)
       }
@@ -260,8 +337,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     }
 
     case RequestMasterState => {
-      sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
-        state)
+      sender ! MasterStateResponse(host, port, workers.toArray, drivers.toArray,
+        completedDrivers.toArray ,apps.toArray, completedApps.toArray, state)
     }
 
     case CheckForWorkerTimeOut => {
@@ -277,7 +354,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
       apps.count(_.state == ApplicationState.UNKNOWN) == 0
 
-  def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) {
+  def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
+      storedWorkers: Seq[WorkerInfo]) {
     for (app <- storedApps) {
       logInfo("Trying to recover app: " + app.id)
       try {
@@ -289,6 +367,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       }
     }
 
+    for (driver <- storedDrivers) {
+      // Here we just read in the list of drivers. Any drivers associated with now-lost workers
+      // will be re-launched when we detect that the worker is missing.
+      drivers += driver
+    }
+
     for (worker <- storedWorkers) {
       logInfo("Trying to recover worker: " + worker.id)
       try {
@@ -312,6 +396,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
     apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
 
+    // Reschedule drivers which were not claimed by any workers
+    drivers.filter(_.worker.isEmpty).foreach { d =>
+      logWarning(s"Driver ${d.id} was not found after master recovery, re-launching")
+      relaunchDriver(d)
+    }
+
     state = RecoveryState.ALIVE
     schedule()
     logInfo("Recovery complete - resuming operations!")
@@ -332,6 +422,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
    */
   def schedule() {
     if (state != RecoveryState.ALIVE) { return }
+    // First schedule drivers, they take strict precedence over applications
+    for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
+      for (driver <- Seq(waitingDrivers: _*)) {
+        if (worker.memoryFree > driver.desc.mem) {
+          launchDriver(worker, driver)
+          waitingDrivers -= driver
+        }
+      }
+    }
+
     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
     // in the queue, then the second app, etc.
     if (spreadOutApps) {
@@ -418,9 +518,19 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         exec.id, ExecutorState.LOST, Some("worker lost"), None)
       exec.application.removeExecutor(exec)
     }
+    for (driver <- worker.drivers.values) {
+      relaunchDriver(driver)
+    }
     persistenceEngine.removeWorker(worker)
   }
 
+  def relaunchDriver(driver: DriverInfo) {
+    driver.worker = None
+    driver.state = DriverState.RELAUNCHING
+    waitingDrivers += driver
+    schedule()
+  }
+
   def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
     val now = System.currentTimeMillis()
     val date = new Date(now)
@@ -499,6 +609,28 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       }
     }
   }
+
+  /** Generate a new driver ID given a driver's submission date */
+  def newDriverId(submitDate: Date): String = {
+    val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
+    nextDriverNumber += 1
+    appId
+  }
+
+  def createDriver(desc: DriverDescription): DriverInfo = {
+    val now = System.currentTimeMillis()
+    val date = new Date(now)
+    new DriverInfo(now, newDriverId(date), desc, date)
+  }
+
+  def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
+    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
+    worker.addDriver(driver)
+    driver.worker = Some(worker)
+    worker.actor ! LaunchDriver(driver.id, driver.desc.jarUrl, driver.desc.mainClass,
+      driver.desc.mem)
+    driver.state = DriverState.RUNNING
+  }
 }
 
 private[spark] object Master {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
index 94b986c..e3640ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -35,11 +35,15 @@ private[spark] trait PersistenceEngine {
 
   def removeWorker(worker: WorkerInfo)
 
+  def addDriver(driver: DriverInfo)
+
+  def removeDriver(driver: DriverInfo)
+
   /**
    * Returns the persisted data sorted by their respective ids (which implies that they're
    * sorted by time of creation).
    */
-  def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo])
+  def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo])
 
   def close() {}
 }
@@ -49,5 +53,8 @@ private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
   override def removeApplication(app: ApplicationInfo) {}
   override def addWorker(worker: WorkerInfo) {}
   override def removeWorker(worker: WorkerInfo) {}
-  override def readPersistedData() = (Nil, Nil)
+  override def addDriver(driver: DriverInfo) {}
+  override def removeDriver(driver: DriverInfo) {}
+
+  override def readPersistedData() = (Nil, Nil, Nil)
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index e05f587..27c2ff4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -36,6 +36,7 @@ private[spark] class WorkerInfo(
   assert (port > 0)
 
   @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info
+  @transient var drivers: mutable.HashMap[String, DriverInfo] = _
   @transient var state: WorkerState.Value = _
   @transient var coresUsed: Int = _
   @transient var memoryUsed: Int = _
@@ -54,6 +55,7 @@ private[spark] class WorkerInfo(
 
   private def init() {
     executors = new mutable.HashMap
+    drivers = new mutable.HashMap
     state = WorkerState.ALIVE
     coresUsed = 0
     memoryUsed = 0
@@ -83,6 +85,18 @@ private[spark] class WorkerInfo(
     executors.values.exists(_.application == app)
   }
 
+  def addDriver(driver: DriverInfo) {
+    drivers(driver.id) = driver
+    memoryUsed += driver.desc.mem
+    coresUsed += 1
+  }
+
+  def removeDriver(driver: DriverInfo) {
+    drivers -= driver.id
+    memoryUsed -= driver.desc.mem
+    coresUsed -= 1
+  }
+
   def webUiAddress : String = {
     "http://" + this.publicAddress + ":" + this.webUiPort
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 825344b..52df173 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -49,6 +49,14 @@ class ZooKeeperPersistenceEngine(serialization: Serialization)
     zk.delete(WORKING_DIR + "/app_" + app.id)
   }
 
+  override def addDriver(driver: DriverInfo) {
+    serializeIntoFile(WORKING_DIR + "/driver_" + driver.id, driver)
+  }
+
+  override def removeDriver(driver: DriverInfo) {
+    zk.delete(WORKING_DIR + "/driver_" + driver.id)
+  }
+
   override def addWorker(worker: WorkerInfo) {
     serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker)
   }
@@ -61,13 +69,15 @@ class ZooKeeperPersistenceEngine(serialization: Serialization)
     zk.close()
   }
 
-  override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+  override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
     val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
     val appFiles = sortedFiles.filter(_.startsWith("app_"))
     val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+    val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
+    val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
     val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
     val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
-    (apps, workers)
+    (apps, drivers, workers)
   }
 
   private def serializeIntoFile(path: String, value: AnyRef) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 4ef7628..13903b4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -26,7 +26,8 @@ import net.liftweb.json.JsonAST.JValue
 
 import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.master.{DriverInfo, ApplicationInfo, WorkerInfo}
 import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.Utils
 
@@ -56,6 +57,12 @@ private[spark] class IndexPage(parent: MasterWebUI) {
     val completedApps = state.completedApps.sortBy(_.endTime).reverse
     val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
 
+    val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Memory", "Main Class")
+    val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
+    val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
+    val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
+    val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers)
+
     val content =
         <div class="row-fluid">
           <div class="span12">
@@ -70,6 +77,9 @@ private[spark] class IndexPage(parent: MasterWebUI) {
               <li><strong>Applications:</strong>
                 {state.activeApps.size} Running,
                 {state.completedApps.size} Completed </li>
+              <li><strong>Drivers:</strong>
+                {state.activeDrivers.size} Running,
+                {state.completedDrivers.size} Completed </li>
             </ul>
           </div>
         </div>
@@ -94,7 +104,22 @@ private[spark] class IndexPage(parent: MasterWebUI) {
             <h4> Completed Applications </h4>
             {completedAppsTable}
           </div>
-        </div>;
+        </div>
+
+          <div class="row-fluid">
+            <div class="span12">
+              <h4> Active Drivers </h4>
+
+              {activeDriversTable}
+            </div>
+          </div>
+
+          <div class="row-fluid">
+            <div class="span12">
+              <h4> Completed Drivers </h4>
+              {completedDriversTable}
+            </div>
+          </div>;
     UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
   }
 
@@ -134,4 +159,17 @@ private[spark] class IndexPage(parent: MasterWebUI) {
       <td>{DeployWebUI.formatDuration(app.duration)}</td>
     </tr>
   }
+
+  def driverRow(driver: DriverInfo): Seq[Node] = {
+    <tr>
+      <td>{driver.id} </td>
+      <td>{driver.submitDate}</td>
+      <td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td>
+      <td>{driver.state}</td>
+      <td sorttable_customkey={driver.desc.mem.toString}>
+        {Utils.megabytesToString(driver.desc.mem.toLong)}
+      </td>
+      <td>{driver.desc.mainClass}</td>
+    </tr>
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
new file mode 100644
index 0000000..fccc36b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import java.io._
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import org.apache.hadoop.fs.{FileUtil, Path}
+import org.apache.hadoop.conf.Configuration
+import akka.actor.{ActorRef, ActorSelection}
+import org.apache.spark.deploy.DeployMessages.DriverStateChanged
+import org.apache.spark.deploy.master.DriverState
+
+/**
+ * Manages the execution of one driver process.
+ */
+private[spark] class DriverRunner(
+    val driverId: String,
+    val jarUrl: String,
+    val mainClass: String,
+    val workDir: File,
+    val memory: Int,
+    val worker: ActorRef)
+  extends Logging {
+
+  var process: Option[Process] = None
+  @volatile var killed = false
+
+  /** Starts a thread to run and manage the driver. */
+  def start() = {
+    new Thread("DriverRunner for " + driverId) {
+      override def run() {
+        var exn: Option[Exception] = None
+
+        try {
+          val driverDir = createWorkingDirectory()
+          val localJarFilename = downloadUserJar(driverDir)
+          val command = Seq("java", "-cp", localJarFilename, mainClass)
+          runCommandWithRetry(command, driverDir)
+        }
+        catch {
+          case e: Exception => exn = Some(e)
+        }
+
+        val finalState =
+          if (killed) { DriverState.KILLED }
+          else if (exn.isDefined) { DriverState.FAILED }
+          else { DriverState.FINISHED }
+
+        worker ! DriverStateChanged(driverId, finalState, exn)
+      }
+    }.start()
+  }
+
+  /** Terminate this driver (or prevent it from ever starting if not yet started) */
+  def kill() {
+    killed = true
+    process.foreach(p => p.destroy())
+  }
+
+  /** Spawn a thread that will redirect a given stream to a file */
+  def redirectStream(in: InputStream, file: File) {
+    val out = new FileOutputStream(file, true)
+    new Thread("redirect output to " + file) {
+      override def run() {
+        try {
+          Utils.copyStream(in, out, true)
+        } catch {
+          case e: IOException =>
+            logInfo("Redirection to " + file + " closed: " + e.getMessage)
+        }
+      }
+    }.start()
+  }
+
+  /**
+   * Creates the working directory for this driver.
+   * Will throw an exception if there are errors preparing the directory.
+   */
+  def createWorkingDirectory(): File = {
+    val driverDir = new File(workDir, driverId)
+    if (!driverDir.exists() && !driverDir.mkdirs()) {
+      throw new IOException("Failed to create directory " + driverDir)
+    }
+    driverDir
+  }
+
+  /**
+   * Download the user jar into the supplied directory and return its local path.
+   * Will throw an exception if there are errors downloading the jar.
+   */
+  def downloadUserJar(driverDir: File): String = {
+
+    val jarPath = new Path(jarUrl)
+
+    val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
+    val jarFileSystem = jarPath.getFileSystem(emptyConf)
+
+    val destPath = new Path(driverDir.getAbsolutePath())
+    val destFileSystem = destPath.getFileSystem(emptyConf)
+    val jarFileName = jarPath.getName
+    val localJarFile = new File(driverDir, jarFileName)
+    val localJarFilename = localJarFile.getAbsolutePath
+
+    if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
+      logInfo(s"Copying user jar $jarPath to $destPath")
+      FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf)
+    }
+
+    if (!localJarFile.exists()) { // Verify copy succeeded
+      throw new Exception(s"Did not see expected jar $jarFileName in $driverDir")
+    }
+
+    localJarFilename
+  }
+
+  /** Continue launching the supplied command until it exits zero. */
+  def runCommandWithRetry(command: Seq[String], baseDir: File) = {
+    /* Time to wait between submission retries. */
+    var waitSeconds = 1
+    // TODO: We should distinguish between "immediate" exits and cases where it was running
+    //       for a long time and then exits.
+    var cleanExit = false
+
+    while (!cleanExit && !killed) {
+      Thread.sleep(waitSeconds * 1000)
+      val builder = new ProcessBuilder(command: _*).directory(baseDir)
+      logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
+
+      process = Some(builder.start())
+
+      // Redirect stdout and stderr to files
+      val stdout = new File(baseDir, "stdout")
+      redirectStream(process.get.getInputStream, stdout)
+
+      val stderr = new File(baseDir, "stderr")
+      val header = "Driver Command: %s\n%s\n\n".format(
+        command.mkString("\"", "\" \"", "\""), "=" * 40)
+      Files.write(header, stderr, Charsets.UTF_8)
+      redirectStream(process.get.getErrorStream, stderr)
+
+
+      val exitCode =
+        /* There is a race here I've elected to ignore for now because it's very unlikely and not
+         * simple to fix. This could see `killed=false` then the main thread gets a kill request
+         * and sets `killed=true` and destroys the not-yet-started process, then this thread
+         * launches the process. For now, in that case the user can just re-submit the kill
+         * request. */
+        if (killed) -1
+        else process.get.waitFor()
+
+      cleanExit = exitCode == 0
+      if (!cleanExit && !killed) {
+        waitSeconds = waitSeconds * 2 // exponential back-off
+        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 87531b6..a2b491a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -30,18 +30,10 @@ import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
 import org.apache.spark.{SparkException, Logging}
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.deploy.master.Master
+import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
 
 /**
   * @param masterUrls Each url should look like spark://host:port.
@@ -83,6 +75,9 @@ private[spark] class Worker(
   var workDir: File = null
   val executors = new HashMap[String, ExecutorRunner]
   val finishedExecutors = new HashMap[String, ExecutorRunner]
+  val drivers = new HashMap[String, DriverRunner]
+  val finishedDrivers = new HashMap[String, DriverRunner]
+
   val publicAddress = {
     val envVar = System.getenv("SPARK_PUBLIC_DNS")
     if (envVar != null) envVar else host
@@ -193,7 +188,7 @@ private[spark] class Worker(
 
       val execs = executors.values.
         map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
-      sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
+      sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)
 
     case RegisterWorkerFailed(message) =>
       if (!registered) {
@@ -247,13 +242,56 @@ private[spark] class Worker(
         }
       }
 
+    case LaunchDriver(driverId, jarUrl, mainClass, memory) => {
+      logInfo(s"Asked to launch driver $driverId")
+      val driver = new DriverRunner(driverId, jarUrl, mainClass, workDir, memory, self)
+      drivers(driverId) = driver
+      driver.start()
+
+      coresUsed += 1
+      memoryUsed += memory
+    }
+
+    case KillDriver(driverId) => {
+      logInfo(s"Asked to kill driver $driverId")
+
+      drivers.find(_._1 == driverId) match {
+        case Some((id, runner)) =>
+          runner.kill()
+        case None =>
+          logError(s"Asked to kill unknown driver $driverId")
+      }
+
+    }
+
+
+    case DriverStateChanged(driverId, state, exception) => {
+      state match {
+        case DriverState.FAILED =>
+          logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
+        case DriverState.FINISHED =>
+          logInfo(s"Driver $driverId exited successfully")
+        case DriverState.KILLED =>
+          logInfo(s"Driver $driverId was killed")
+      }
+      masterLock.synchronized {
+        master ! DriverStateChanged(driverId, state, exception)
+      }
+      val driver = drivers(driverId)
+      memoryUsed -= driver.memory
+      coresUsed -= 1
+      drivers -= driverId
+      finishedDrivers(driverId) = driver
+    }
+
     case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
       logInfo(s"$x Disassociated !")
       masterDisconnected()
 
     case RequestWorkerState => {
       sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
-        finishedExecutors.values.toList, activeMasterUrl, cores, memory,
+        finishedExecutors.values.toList, drivers.values.toList,
+        finishedDrivers.values.toList, activeMasterUrl, cores, memory,
         coresUsed, memoryUsed, activeMasterWebUiUrl)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 0d59048..e233b82 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -30,7 +30,7 @@ import net.liftweb.json.JsonAST.JValue
 
 import org.apache.spark.deploy.JsonProtocol
 import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
-import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
 import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.Utils
 
@@ -56,6 +56,12 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
     val finishedExecutorTable =
       UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
 
+    val driverHeaders = Seq("DriverID", "Main Class", "Memory", "Logs")
+    val runningDriverTable =
+      UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers)
+    def finishedDriverTable =
+      UIUtils.listingTable(driverHeaders, driverRow, workerState.finishedDrivers)
+
     val content =
         <div class="row-fluid"> <!-- Worker Details -->
           <div class="span12">
@@ -84,6 +90,20 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
             <h4> Finished Executors </h4>
             {finishedExecutorTable}
           </div>
+        </div>
+
+        <div class="row-fluid"> <!-- Running Drivers -->
+          <div class="span12">
+            <h4> Running Drivers {workerState.drivers.size} </h4>
+            {runningDriverTable}
+          </div>
+        </div>
+
+        <div class="row-fluid"> <!-- Finished Drivers  -->
+          <div class="span12">
+            <h4> Finished Drivers </h4>
+            {finishedDriverTable}
+          </div>
         </div>;
 
     UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
@@ -111,6 +131,20 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
           .format(executor.appId, executor.execId)}>stderr</a>
       </td> 
     </tr>
+
   }
 
+  def driverRow(driver: DriverRunner): Seq[Node] = {
+    <tr>
+      <td>{driver.driverId}</td>
+      <td>{driver.mainClass}</td>
+      <td sorttable_customkey={driver.memory.toString}>
+        {Utils.megabytesToString(driver.memory)}
+      </td>
+      <td>
+        <a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a>
+        <a href={s"logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a>
+      </td>
+    </tr>
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a4acc4c/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 40d6bdb..d128e58 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -69,30 +69,44 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
 
   def log(request: HttpServletRequest): String = {
     val defaultBytes = 100 * 1024
-    val appId = request.getParameter("appId")
-    val executorId = request.getParameter("executorId")
+
+    val appId = Option(request.getParameter("appId"))
+    val executorId = Option(request.getParameter("executorId"))
+    val driverId = Option(request.getParameter("driverId"))
     val logType = request.getParameter("logType")
     val offset = Option(request.getParameter("offset")).map(_.toLong)
     val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
-    val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+
+    val path = (appId, executorId, driverId) match {
+      case (Some(a), Some(e), None) =>
+        s"${workDir.getPath}/$appId/$executorId/$logType"
+      case (None, None, Some(d)) =>
+        s"${workDir.getPath}/$driverId/$logType"
+    }
 
     val (startByte, endByte) = getByteRange(path, offset, byteLength)
     val file = new File(path)
     val logLength = file.length
 
-    val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n"
-      .format(startByte, endByte, logLength, appId, executorId, logType)
+    val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
     pre + Utils.offsetBytes(path, startByte, endByte)
   }
 
   def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
     val defaultBytes = 100 * 1024
-    val appId = request.getParameter("appId")
-    val executorId = request.getParameter("executorId")
+    val appId = Option(request.getParameter("appId"))
+    val executorId = Option(request.getParameter("executorId"))
+    val driverId = Option(request.getParameter("driverId"))
     val logType = request.getParameter("logType")
     val offset = Option(request.getParameter("offset")).map(_.toLong)
     val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
-    val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+
+    val (path, params) = (appId, executorId, driverId) match {
+      case (Some(a), Some(e), None) =>
+        (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
+      case (None, None, Some(d)) =>
+        (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
+    }
 
     val (startByte, endByte) = getByteRange(path, offset, byteLength)
     val file = new File(path)
@@ -106,9 +120,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
 
     val backButton =
       if (startByte > 0) {
-        <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s"
-          .format(appId, executorId, logType, math.max(startByte-byteLength, 0),
-          byteLength)}>
+        <a href={"?%s&logType=%s&offset=%s&byteLength=%s"
+          .format(params, logType, math.max(startByte-byteLength, 0), byteLength)}>
           <button type="button" class="btn btn-default">
             Previous {Utils.bytesToString(math.min(byteLength, startByte))}
           </button>
@@ -122,8 +135,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
 
     val nextButton =
       if (endByte < logLength) {
-        <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
-          format(appId, executorId, logType, endByte, byteLength)}>
+        <a href={"?%s&logType=%s&offset=%s&byteLength=%s".
+          format(params, logType, endByte, byteLength)}>
           <button type="button" class="btn btn-default">
             Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))}
           </button>