You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:24 UTC

[40/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/Command.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala
new file mode 100644
index 0000000..fa8af9a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.Map
+
+private[spark] case class Command(
+    mainClass: String,
+    arguments: Seq[String],
+    environment: Map[String, String]) {
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
new file mode 100644
index 0000000..4dc6ada
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.immutable.List
+
+import org.apache.spark.Utils
+import org.apache.spark.deploy.ExecutorState.ExecutorState
+import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
+import org.apache.spark.deploy.worker.ExecutorRunner
+
+
+private[deploy] sealed trait DeployMessage extends Serializable
+
+private[deploy] object DeployMessages {
+
+  // Worker to Master
+
+  case class RegisterWorker(
+      id: String,
+      host: String,
+      port: Int,
+      cores: Int,
+      memory: Int,
+      webUiPort: Int,
+      publicAddress: String)
+    extends DeployMessage {
+    Utils.checkHost(host, "Required hostname")
+    assert (port > 0)
+  }
+
+  case class ExecutorStateChanged(
+      appId: String,
+      execId: Int,
+      state: ExecutorState,
+      message: Option[String],
+      exitStatus: Option[Int])
+    extends DeployMessage
+
+  case class Heartbeat(workerId: String) extends DeployMessage
+
+  // Master to Worker
+
+  case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
+
+  case class RegisterWorkerFailed(message: String) extends DeployMessage
+
+  case class KillExecutor(appId: String, execId: Int) extends DeployMessage
+
+  case class LaunchExecutor(
+      appId: String,
+      execId: Int,
+      appDesc: ApplicationDescription,
+      cores: Int,
+      memory: Int,
+      sparkHome: String)
+    extends DeployMessage
+
+  // Client to Master
+
+  case class RegisterApplication(appDescription: ApplicationDescription)
+    extends DeployMessage
+
+  // Master to Client
+
+  case class RegisteredApplication(appId: String) extends DeployMessage
+
+  // TODO(matei): replace hostPort with host
+  case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
+    Utils.checkHostPort(hostPort, "Required hostport")
+  }
+
+  case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
+    exitStatus: Option[Int])
+
+  case class ApplicationRemoved(message: String)
+
+  // Internal message in Client
+
+  case object StopClient
+
+  // MasterWebUI To Master
+
+  case object RequestMasterState
+
+  // Master to MasterWebUI
+
+  case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
+    activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+
+    Utils.checkHost(host, "Required hostname")
+    assert (port > 0)
+
+    def uri = "spark://" + host + ":" + port
+  }
+
+  //  WorkerWebUI to Worker
+
+  case object RequestWorkerState
+
+  // Worker to WorkerWebUI
+
+  case class WorkerStateResponse(host: String, port: Int, workerId: String,
+    executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
+    cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
+
+    Utils.checkHost(host, "Required hostname")
+    assert (port > 0)
+  }
+
+  // Actor System to Master
+
+  case object CheckForWorkerTimeOut
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
new file mode 100644
index 0000000..fcfea96
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+private[spark] object ExecutorState
+  extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
+
+  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
+
+  type ExecutorState = Value
+
+  def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
new file mode 100644
index 0000000..a6be8ef
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import net.liftweb.json.JsonDSL._
+
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
+import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.worker.ExecutorRunner
+
+
+private[spark] object JsonProtocol {
+ def writeWorkerInfo(obj: WorkerInfo) = {
+   ("id" -> obj.id) ~
+   ("host" -> obj.host) ~
+   ("port" -> obj.port) ~
+   ("webuiaddress" -> obj.webUiAddress) ~
+   ("cores" -> obj.cores) ~
+   ("coresused" -> obj.coresUsed) ~
+   ("memory" -> obj.memory) ~
+   ("memoryused" -> obj.memoryUsed) ~
+   ("state" -> obj.state.toString)
+ }
+
+  def writeApplicationInfo(obj: ApplicationInfo) = {
+    ("starttime" -> obj.startTime) ~
+    ("id" -> obj.id) ~
+    ("name" -> obj.desc.name) ~
+    ("cores" -> obj.desc.maxCores) ~
+    ("user" ->  obj.desc.user) ~
+    ("memoryperslave" -> obj.desc.memoryPerSlave) ~
+    ("submitdate" -> obj.submitDate.toString)
+  }
+
+  def writeApplicationDescription(obj: ApplicationDescription) = {
+    ("name" -> obj.name) ~
+    ("cores" -> obj.maxCores) ~
+    ("memoryperslave" -> obj.memoryPerSlave) ~
+    ("user" -> obj.user)
+  }
+
+  def writeExecutorRunner(obj: ExecutorRunner) = {
+    ("id" -> obj.execId) ~
+    ("memory" -> obj.memory) ~
+    ("appid" -> obj.appId) ~
+    ("appdesc" -> writeApplicationDescription(obj.appDesc))
+  }
+
+  def writeMasterState(obj: MasterStateResponse) = {
+    ("url" -> ("spark://" + obj.uri)) ~
+    ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
+    ("cores" -> obj.workers.map(_.cores).sum) ~
+    ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
+    ("memory" -> obj.workers.map(_.memory).sum) ~
+    ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
+    ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
+    ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
+  }
+
+  def writeWorkerState(obj: WorkerStateResponse) = {
+    ("id" -> obj.workerId) ~
+    ("masterurl" -> obj.masterUrl) ~
+    ("masterwebuiurl" -> obj.masterWebUiUrl) ~
+    ("cores" -> obj.cores) ~
+    ("coresused" -> obj.coresUsed) ~
+    ("memory" -> obj.memory) ~
+    ("memoryused" -> obj.memoryUsed) ~
+    ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
+    ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
new file mode 100644
index 0000000..af5a411
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+
+import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.util.AkkaUtils
+import org.apache.spark.{Logging, Utils}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Testing class that creates a Spark standalone process in-cluster (that is, running the
+ * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
+ * by the Workers still run in separate JVMs. This can be used to test distributed operation and
+ * fault recovery without spinning up a lot of processes.
+ */
+private[spark]
+class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
+  
+  private val localHostname = Utils.localHostName()
+  private val masterActorSystems = ArrayBuffer[ActorSystem]()
+  private val workerActorSystems = ArrayBuffer[ActorSystem]()
+  
+  def start(): String = {
+    logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
+
+    /* Start the Master */
+    val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0)
+    masterActorSystems += masterSystem
+    val masterUrl = "spark://" + localHostname + ":" + masterPort
+
+    /* Start the Workers */
+    for (workerNum <- 1 to numWorkers) {
+      val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
+        memoryPerWorker, masterUrl, null, Some(workerNum))
+      workerActorSystems += workerSystem
+    }
+
+    return masterUrl
+  }
+
+  def stop() {
+    logInfo("Shutting down local Spark cluster.")
+    // Stop the workers before the master so they don't get upset that it disconnected
+    workerActorSystems.foreach(_.shutdown())
+    workerActorSystems.foreach(_.awaitTermination())
+
+    masterActorSystems.foreach(_.shutdown())
+    masterActorSystems.foreach(_.awaitTermination())
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
new file mode 100644
index 0000000..0a5f4c3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+
+
+/**
+ * Contains util methods to interact with Hadoop from spark.
+ */
+class SparkHadoopUtil {
+
+  // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+  def newConfiguration(): Configuration = new Configuration()
+
+  // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+  def addCredentials(conf: JobConf) {}
+
+  def isYarnMode(): Boolean = { false }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
new file mode 100644
index 0000000..ae258b5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+/**
+ * Utilities used throughout the web UI.
+ */
+private[spark] object DeployWebUI {
+  val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+
+  def formatDate(date: Date): String = DATE_FORMAT.format(date)
+
+  def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
+
+  def formatDuration(milliseconds: Long): String = {
+    val seconds = milliseconds.toDouble / 1000
+    if (seconds < 60) {
+      return "%.0f s".format(seconds)
+    }
+    val minutes = seconds / 60
+    if (minutes < 10) {
+      return "%.1f min".format(minutes)
+    } else if (minutes < 60) {
+      return "%.0f min".format(minutes)
+    }
+    val hours = minutes / 60
+    return "%.1f h".format(hours)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
new file mode 100644
index 0000000..a342dd7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import java.util.concurrent.TimeoutException
+
+import akka.actor._
+import akka.actor.Terminated
+import akka.pattern.ask
+import akka.util.Duration
+import akka.remote.RemoteClientDisconnected
+import akka.remote.RemoteClientLifeCycleEvent
+import akka.remote.RemoteClientShutdown
+import akka.dispatch.Await
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.Master
+
+
+/**
+ * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
+ * and a listener for cluster events, and calls back the listener when various events occur.
+ */
+private[spark] class Client(
+    actorSystem: ActorSystem,
+    masterUrl: String,
+    appDescription: ApplicationDescription,
+    listener: ClientListener)
+  extends Logging {
+
+  var actor: ActorRef = null
+  var appId: String = null
+
+  class ClientActor extends Actor with Logging {
+    var master: ActorRef = null
+    var masterAddress: Address = null
+    var alreadyDisconnected = false  // To avoid calling listener.disconnected() multiple times
+
+    override def preStart() {
+      logInfo("Connecting to master " + masterUrl)
+      try {
+        master = context.actorFor(Master.toAkkaUrl(masterUrl))
+        masterAddress = master.path.address
+        master ! RegisterApplication(appDescription)
+        context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+        context.watch(master)  // Doesn't work with remote actors, but useful for testing
+      } catch {
+        case e: Exception =>
+          logError("Failed to connect to master", e)
+          markDisconnected()
+          context.stop(self)
+      }
+    }
+
+    override def receive = {
+      case RegisteredApplication(appId_) =>
+        appId = appId_
+        listener.connected(appId)
+
+      case ApplicationRemoved(message) =>
+        logError("Master removed our application: %s; stopping client".format(message))
+        markDisconnected()
+        context.stop(self)
+
+      case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
+        val fullId = appId + "/" + id
+        logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
+        listener.executorAdded(fullId, workerId, hostPort, cores, memory)
+
+      case ExecutorUpdated(id, state, message, exitStatus) =>
+        val fullId = appId + "/" + id
+        val messageText = message.map(s => " (" + s + ")").getOrElse("")
+        logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
+        if (ExecutorState.isFinished(state)) {
+          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
+        }
+
+      case Terminated(actor_) if actor_ == master =>
+        logError("Connection to master failed; stopping client")
+        markDisconnected()
+        context.stop(self)
+
+      case RemoteClientDisconnected(transport, address) if address == masterAddress =>
+        logError("Connection to master failed; stopping client")
+        markDisconnected()
+        context.stop(self)
+
+      case RemoteClientShutdown(transport, address) if address == masterAddress =>
+        logError("Connection to master failed; stopping client")
+        markDisconnected()
+        context.stop(self)
+
+      case StopClient =>
+        markDisconnected()
+        sender ! true
+        context.stop(self)
+    }
+
+    /**
+     * Notify the listener that we disconnected, if we hadn't already done so before.
+     */
+    def markDisconnected() {
+      if (!alreadyDisconnected) {
+        listener.disconnected()
+        alreadyDisconnected = true
+      }
+    }
+  }
+
+  def start() {
+    // Just launch an actor; it will call back into the listener.
+    actor = actorSystem.actorOf(Props(new ClientActor))
+  }
+
+  def stop() {
+    if (actor != null) {
+      try {
+        val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+        val future = actor.ask(StopClient)(timeout)
+        Await.result(future, timeout)
+      } catch {
+        case e: TimeoutException =>
+          logInfo("Stop request to Master timed out; it may already be shut down.")
+      }
+      actor = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
new file mode 100644
index 0000000..4605368
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+/**
+ * Callbacks invoked by deploy client when various events happen. There are currently four events:
+ * connecting to the cluster, disconnecting, being given an executor, and having an executor
+ * removed (either due to failure or due to revocation).
+ *
+ * Users of this API should *not* block inside the callback methods.
+ */
+private[spark] trait ClientListener {
+  def connected(appId: String): Unit
+
+  def disconnected(): Unit
+
+  def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
+
+  def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
new file mode 100644
index 0000000..0322029
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import org.apache.spark.util.AkkaUtils
+import org.apache.spark.{Logging, Utils}
+import org.apache.spark.deploy.{Command, ApplicationDescription}
+
+private[spark] object TestClient {
+
+  class TestListener extends ClientListener with Logging {
+    def connected(id: String) {
+      logInfo("Connected to master, got app ID " + id)
+    }
+
+    def disconnected() {
+      logInfo("Disconnected from master")
+      System.exit(0)
+    }
+
+    def executorAdded(id: String, workerId: String, hostPort: String, cores: Int, memory: Int) {}
+
+    def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
+  }
+
+  def main(args: Array[String]) {
+    val url = args(0)
+    val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
+    val desc = new ApplicationDescription(
+      "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
+    val listener = new TestListener
+    val client = new Client(actorSystem, url, desc, listener)
+    client.start()
+    actorSystem.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/client/TestExecutor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestExecutor.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestExecutor.scala
new file mode 100644
index 0000000..c5ac45c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestExecutor.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+private[spark] object TestExecutor {
+  def main(args: Array[String]) {
+    println("Hello world!")
+    while (true) {
+      Thread.sleep(1000)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
new file mode 100644
index 0000000..bd53276
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.deploy.ApplicationDescription
+import java.util.Date
+import akka.actor.ActorRef
+import scala.collection.mutable
+
+private[spark] class ApplicationInfo(
+    val startTime: Long,
+    val id: String,
+    val desc: ApplicationDescription,
+    val submitDate: Date,
+    val driver: ActorRef,
+    val appUiUrl: String)
+{
+  var state = ApplicationState.WAITING
+  var executors = new mutable.HashMap[Int, ExecutorInfo]
+  var coresGranted = 0
+  var endTime = -1L
+  val appSource = new ApplicationSource(this)
+
+  private var nextExecutorId = 0
+
+  def newExecutorId(): Int = {
+    val id = nextExecutorId
+    nextExecutorId += 1
+    id
+  }
+
+  def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
+    val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
+    executors(exec.id) = exec
+    coresGranted += cores
+    exec
+  }
+
+  def removeExecutor(exec: ExecutorInfo) {
+    if (executors.contains(exec.id)) {
+      executors -= exec.id
+      coresGranted -= exec.cores
+    }
+  }
+
+  def coresLeft: Int = desc.maxCores - coresGranted
+
+  private var _retryCount = 0
+
+  def retryCount = _retryCount
+
+  def incrementRetryCount = {
+    _retryCount += 1
+    _retryCount
+  }
+
+  def markFinished(endState: ApplicationState.Value) {
+    state = endState
+    endTime = System.currentTimeMillis()
+  }
+
+  def duration: Long = {
+    if (endTime != -1) {
+      endTime - startTime
+    } else {
+      System.currentTimeMillis() - startTime
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
new file mode 100644
index 0000000..2d75ad5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
@@ -0,0 +1,24 @@
+package org.apache.spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+class ApplicationSource(val application: ApplicationInfo) extends Source {
+  val metricRegistry = new MetricRegistry()
+  val sourceName = "%s.%s.%s".format("application", application.desc.name,
+    System.currentTimeMillis())
+
+  metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
+    override def getValue: String = application.state.toString
+  })
+
+  metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
+    override def getValue: Long = application.duration
+  })
+
+  metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
+    override def getValue: Int = application.coresGranted
+  })
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
new file mode 100644
index 0000000..7e80422
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+private[spark] object ApplicationState
+  extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
+
+  type ApplicationState = Value
+
+  val WAITING, RUNNING, FINISHED, FAILED = Value
+
+  val MAX_NUM_RETRY = 10
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
new file mode 100644
index 0000000..cf384a9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.deploy.ExecutorState
+
+private[spark] class ExecutorInfo(
+    val id: Int,
+    val application: ApplicationInfo,
+    val worker: WorkerInfo,
+    val cores: Int,
+    val memory: Int) {
+
+  var state = ExecutorState.LAUNCHING
+
+  def fullId: String = application.id + "/" + id
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
new file mode 100644
index 0000000..869b2b2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
+import akka.actor._
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.util.duration._
+
+import org.apache.spark.{Logging, SparkException, Utils}
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.ui.MasterWebUI
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.util.AkkaUtils
+
+
+private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
+  val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
+  val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
+  val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
+  val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
+ 
+  var nextAppNumber = 0
+  val workers = new HashSet[WorkerInfo]
+  val idToWorker = new HashMap[String, WorkerInfo]
+  val actorToWorker = new HashMap[ActorRef, WorkerInfo]
+  val addressToWorker = new HashMap[Address, WorkerInfo]
+
+  val apps = new HashSet[ApplicationInfo]
+  val idToApp = new HashMap[String, ApplicationInfo]
+  val actorToApp = new HashMap[ActorRef, ApplicationInfo]
+  val addressToApp = new HashMap[Address, ApplicationInfo]
+
+  val waitingApps = new ArrayBuffer[ApplicationInfo]
+  val completedApps = new ArrayBuffer[ApplicationInfo]
+
+  var firstApp: Option[ApplicationInfo] = None
+
+  Utils.checkHost(host, "Expected hostname")
+
+  val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
+  val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
+  val masterSource = new MasterSource(this)
+
+  val webUi = new MasterWebUI(this, webUiPort)
+
+  val masterPublicAddress = {
+    val envVar = System.getenv("SPARK_PUBLIC_DNS")
+    if (envVar != null) envVar else host
+  }
+
+  // As a temporary workaround before better ways of configuring memory, we allow users to set
+  // a flag that will perform round-robin scheduling across the nodes (spreading out each app
+  // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
+  val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
+
+  override def preStart() {
+    logInfo("Starting Spark master at spark://" + host + ":" + port)
+    // Listen for remote client disconnection events, since they don't go through Akka's watch()
+    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+    webUi.start()
+    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
+
+    masterMetricsSystem.registerSource(masterSource)
+    masterMetricsSystem.start()
+    applicationMetricsSystem.start()
+  }
+
+  override def postStop() {
+    webUi.stop()
+    masterMetricsSystem.stop()
+    applicationMetricsSystem.stop()
+  }
+
+  override def receive = {
+    case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
+      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
+        host, workerPort, cores, Utils.megabytesToString(memory)))
+      if (idToWorker.contains(id)) {
+        sender ! RegisterWorkerFailed("Duplicate worker ID")
+      } else {
+        addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
+        context.watch(sender)  // This doesn't work with remote actors but helps for testing
+        sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
+        schedule()
+      }
+    }
+
+    case RegisterApplication(description) => {
+      logInfo("Registering app " + description.name)
+      val app = addApplication(description, sender)
+      logInfo("Registered app " + description.name + " with ID " + app.id)
+      waitingApps += app
+      context.watch(sender)  // This doesn't work with remote actors but helps for testing
+      sender ! RegisteredApplication(app.id)
+      schedule()
+    }
+
+    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
+      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
+      execOption match {
+        case Some(exec) => {
+          exec.state = state
+          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
+          if (ExecutorState.isFinished(state)) {
+            val appInfo = idToApp(appId)
+            // Remove this executor from the worker and app
+            logInfo("Removing executor " + exec.fullId + " because it is " + state)
+            appInfo.removeExecutor(exec)
+            exec.worker.removeExecutor(exec)
+
+            // Only retry certain number of times so we don't go into an infinite loop.
+            if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
+              schedule()
+            } else {
+              logError("Application %s with ID %s failed %d times, removing it".format(
+                appInfo.desc.name, appInfo.id, appInfo.retryCount))
+              removeApplication(appInfo, ApplicationState.FAILED)
+            }
+          }
+        }
+        case None =>
+          logWarning("Got status update for unknown executor " + appId + "/" + execId)
+      }
+    }
+
+    case Heartbeat(workerId) => {
+      idToWorker.get(workerId) match {
+        case Some(workerInfo) =>
+          workerInfo.lastHeartbeat = System.currentTimeMillis()
+        case None =>
+          logWarning("Got heartbeat from unregistered worker " + workerId)
+      }
+    }
+
+    case Terminated(actor) => {
+      // The disconnected actor could've been either a worker or an app; remove whichever of
+      // those we have an entry for in the corresponding actor hashmap
+      actorToWorker.get(actor).foreach(removeWorker)
+      actorToApp.get(actor).foreach(finishApplication)
+    }
+
+    case RemoteClientDisconnected(transport, address) => {
+      // The disconnected client could've been either a worker or an app; remove whichever it was
+      addressToWorker.get(address).foreach(removeWorker)
+      addressToApp.get(address).foreach(finishApplication)
+    }
+
+    case RemoteClientShutdown(transport, address) => {
+      // The disconnected client could've been either a worker or an app; remove whichever it was
+      addressToWorker.get(address).foreach(removeWorker)
+      addressToApp.get(address).foreach(finishApplication)
+    }
+
+    case RequestMasterState => {
+      sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+    }
+
+    case CheckForWorkerTimeOut => {
+      timeOutDeadWorkers()
+    }
+  }
+
+  /**
+   * Can an app use the given worker? True if the worker has enough memory and we haven't already
+   * launched an executor for the app on it (right now the standalone backend doesn't like having
+   * two executors on the same worker).
+   */
+  def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
+    worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
+  }
+
+  /**
+   * Schedule the currently available resources among waiting apps. This method will be called
+   * every time a new app joins or resource availability changes.
+   */
+  def schedule() {
+    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
+    // in the queue, then the second app, etc.
+    if (spreadOutApps) {
+      // Try to spread out each app among all the nodes, until it has all its cores
+      for (app <- waitingApps if app.coresLeft > 0) {
+        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
+                                   .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+        val numUsable = usableWorkers.length
+        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
+        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
+        var pos = 0
+        while (toAssign > 0) {
+          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
+            toAssign -= 1
+            assigned(pos) += 1
+          }
+          pos = (pos + 1) % numUsable
+        }
+        // Now that we've decided how many cores to give on each node, let's actually give them
+        for (pos <- 0 until numUsable) {
+          if (assigned(pos) > 0) {
+            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
+            launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome)
+            app.state = ApplicationState.RUNNING
+          }
+        }
+      }
+    } else {
+      // Pack each app into as few nodes as possible until we've assigned all its cores
+      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
+        for (app <- waitingApps if app.coresLeft > 0) {
+          if (canUse(app, worker)) {
+            val coresToUse = math.min(worker.coresFree, app.coresLeft)
+            if (coresToUse > 0) {
+              val exec = app.addExecutor(worker, coresToUse)
+              launchExecutor(worker, exec, app.desc.sparkHome)
+              app.state = ApplicationState.RUNNING
+            }
+          }
+        }
+      }
+    }
+  }
+
+  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
+    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
+    worker.addExecutor(exec)
+    worker.actor ! LaunchExecutor(
+      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
+    exec.application.driver ! ExecutorAdded(
+      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
+  }
+
+  def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
+    publicAddress: String): WorkerInfo = {
+    // There may be one or more refs to dead workers on this same node (w/ different ID's),
+    // remove them.
+    workers.filter { w =>
+      (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)
+    }.foreach { w =>
+      workers -= w
+    }
+    val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+    workers += worker
+    idToWorker(worker.id) = worker
+    actorToWorker(sender) = worker
+    addressToWorker(sender.path.address) = worker
+    worker
+  }
+
+  def removeWorker(worker: WorkerInfo) {
+    logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
+    worker.setState(WorkerState.DEAD)
+    idToWorker -= worker.id
+    actorToWorker -= worker.actor
+    addressToWorker -= worker.actor.path.address
+    for (exec <- worker.executors.values) {
+      logInfo("Telling app of lost executor: " + exec.id)
+      exec.application.driver ! ExecutorUpdated(
+        exec.id, ExecutorState.LOST, Some("worker lost"), None)
+      exec.application.removeExecutor(exec)
+    }
+  }
+
+  def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
+    val now = System.currentTimeMillis()
+    val date = new Date(now)
+    val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+    applicationMetricsSystem.registerSource(app.appSource)
+    apps += app
+    idToApp(app.id) = app
+    actorToApp(driver) = app
+    addressToApp(driver.path.address) = app
+    if (firstApp == None) {
+      firstApp = Some(app)
+    }
+    val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
+    if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
+      logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
+    }
+    app
+  }
+
+  def finishApplication(app: ApplicationInfo) {
+    removeApplication(app, ApplicationState.FINISHED)
+  }
+
+  def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
+    if (apps.contains(app)) {
+      logInfo("Removing app " + app.id)
+      apps -= app
+      idToApp -= app.id
+      actorToApp -= app.driver
+      addressToApp -= app.driver.path.address
+      if (completedApps.size >= RETAINED_APPLICATIONS) {
+        val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
+        completedApps.take(toRemove).foreach( a => {
+          applicationMetricsSystem.removeSource(a.appSource)
+        })
+        completedApps.trimStart(toRemove)
+      }
+      completedApps += app // Remember it in our history
+      waitingApps -= app
+      for (exec <- app.executors.values) {
+        exec.worker.removeExecutor(exec)
+        exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
+        exec.state = ExecutorState.KILLED
+      }
+      app.markFinished(state)
+      if (state != ApplicationState.FINISHED) {
+        app.driver ! ApplicationRemoved(state.toString)
+      }
+      schedule()
+    }
+  }
+
+  /** Generate a new app ID given a app's submission date */
+  def newApplicationId(submitDate: Date): String = {
+    val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
+    nextAppNumber += 1
+    appId
+  }
+
+  /** Check for, and remove, any timed-out workers */
+  def timeOutDeadWorkers() {
+    // Copy the workers into an array so we don't modify the hashset while iterating through it
+    val currentTime = System.currentTimeMillis()
+    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
+    for (worker <- toRemove) {
+      if (worker.state != WorkerState.DEAD) {
+        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
+          worker.id, WORKER_TIMEOUT/1000))
+        removeWorker(worker)
+      } else {
+        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
+          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it 
+      }
+    }
+  }
+}
+
+private[spark] object Master {
+  private val systemName = "sparkMaster"
+  private val actorName = "Master"
+  private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
+
+  def main(argStrings: Array[String]) {
+    val args = new MasterArguments(argStrings)
+    val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+    actorSystem.awaitTermination()
+  }
+
+  /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+  def toAkkaUrl(sparkUrl: String): String = {
+    sparkUrl match {
+      case sparkUrlRegex(host, port) =>
+        "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+      case _ =>
+        throw new SparkException("Invalid master URL: " + sparkUrl)
+    }
+  }
+
+  def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+    val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
+    (actorSystem, boundPort)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
new file mode 100644
index 0000000..c86cca2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.util.IntParam
+import org.apache.spark.Utils
+
+/**
+ * Command-line parser for the master.
+ */
+private[spark] class MasterArguments(args: Array[String]) {
+  var host = Utils.localHostName()
+  var port = 7077
+  var webUiPort = 8080
+  
+  // Check for settings in environment variables 
+  if (System.getenv("SPARK_MASTER_HOST") != null) {
+    host = System.getenv("SPARK_MASTER_HOST")
+  }
+  if (System.getenv("SPARK_MASTER_PORT") != null) {
+    port = System.getenv("SPARK_MASTER_PORT").toInt
+  }
+  if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
+    webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
+  }
+  if (System.getProperty("master.ui.port") != null) {
+    webUiPort = System.getProperty("master.ui.port").toInt
+  }
+
+  parse(args.toList)
+
+  def parse(args: List[String]): Unit = args match {
+    case ("--ip" | "-i") :: value :: tail =>
+      Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
+      host = value
+      parse(tail)
+
+    case ("--host" | "-h") :: value :: tail =>
+      Utils.checkHost(value, "Please use hostname " + value)
+      host = value
+      parse(tail)
+
+    case ("--port" | "-p") :: IntParam(value) :: tail =>
+      port = value
+      parse(tail)
+
+    case "--webui-port" :: IntParam(value) :: tail =>
+      webUiPort = value
+      parse(tail)
+
+    case ("--help" | "-h") :: tail =>
+      printUsageAndExit(0)
+
+    case Nil => {}
+
+    case _ =>
+      printUsageAndExit(1)
+  }
+
+  /**
+   * Print usage and exit JVM with the given exit code.
+   */
+  def printUsageAndExit(exitCode: Int) {
+    System.err.println(
+      "Usage: Master [options]\n" +
+      "\n" +
+      "Options:\n" +
+      "  -i HOST, --ip HOST     Hostname to listen on (deprecated, please use --host or -h) \n" +
+      "  -h HOST, --host HOST   Hostname to listen on\n" +
+      "  -p PORT, --port PORT   Port to listen on (default: 7077)\n" +
+      "  --webui-port PORT      Port for web UI (default: 8080)")
+    System.exit(exitCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
new file mode 100644
index 0000000..8dd0a42
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -0,0 +1,25 @@
+package org.apache.spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class MasterSource(val master: Master) extends Source {
+  val metricRegistry = new MetricRegistry()
+  val sourceName = "master"
+
+  // Gauge for worker numbers in cluster
+  metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
+    override def getValue: Int = master.workers.size
+  })
+
+  // Gauge for application numbers in cluster
+  metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
+    override def getValue: Int = master.apps.size
+  })
+
+  // Gauge for waiting application numbers in cluster
+  metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
+    override def getValue: Int = master.waitingApps.size
+  })
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
new file mode 100644
index 0000000..285e07a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import akka.actor.ActorRef
+import scala.collection.mutable
+import org.apache.spark.Utils
+
+private[spark] class WorkerInfo(
+  val id: String,
+  val host: String,
+  val port: Int,
+  val cores: Int,
+  val memory: Int,
+  val actor: ActorRef,
+  val webUiPort: Int,
+  val publicAddress: String) {
+
+  Utils.checkHost(host, "Expected hostname")
+  assert (port > 0)
+
+  var executors = new mutable.HashMap[String, ExecutorInfo]  // fullId => info
+  var state: WorkerState.Value = WorkerState.ALIVE
+  var coresUsed = 0
+  var memoryUsed = 0
+
+  var lastHeartbeat = System.currentTimeMillis()
+
+  def coresFree: Int = cores - coresUsed
+  def memoryFree: Int = memory - memoryUsed
+
+  def hostPort: String = {
+    assert (port > 0)
+    host + ":" + port
+  }
+
+  def addExecutor(exec: ExecutorInfo) {
+    executors(exec.fullId) = exec
+    coresUsed += exec.cores
+    memoryUsed += exec.memory
+  }
+
+  def removeExecutor(exec: ExecutorInfo) {
+    if (executors.contains(exec.fullId)) {
+      executors -= exec.fullId
+      coresUsed -= exec.cores
+      memoryUsed -= exec.memory
+    }
+  }
+
+  def hasExecutor(app: ApplicationInfo): Boolean = {
+    executors.values.exists(_.application == app)
+  }
+
+  def webUiAddress : String = {
+    "http://" + this.publicAddress + ":" + this.webUiPort
+  }
+
+  def setState(state: WorkerState.Value) = {
+    this.state = state
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
new file mode 100644
index 0000000..b5ee6dc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
+  type WorkerState = Value
+
+  val ALIVE, DEAD, DECOMMISSIONED = Value
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
new file mode 100644
index 0000000..6435c7f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master.ui
+
+import scala.xml.Node
+
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+
+import javax.servlet.http.HttpServletRequest
+
+import net.liftweb.json.JsonAST.JValue
+
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.master.ExecutorInfo
+import org.apache.spark.ui.UIUtils
+import org.apache.spark.Utils
+
+private[spark] class ApplicationPage(parent: MasterWebUI) {
+  val master = parent.masterActorRef
+  implicit val timeout = parent.timeout
+
+  /** Executor details for a particular application */
+  def renderJson(request: HttpServletRequest): JValue = {
+    val appId = request.getParameter("appId")
+    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+    val state = Await.result(stateFuture, 30 seconds)
+    val app = state.activeApps.find(_.id == appId).getOrElse({
+      state.completedApps.find(_.id == appId).getOrElse(null)
+    })
+    JsonProtocol.writeApplicationInfo(app)
+  }
+
+  /** Executor details for a particular application */
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val appId = request.getParameter("appId")
+    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+    val state = Await.result(stateFuture, 30 seconds)
+    val app = state.activeApps.find(_.id == appId).getOrElse({
+      state.completedApps.find(_.id == appId).getOrElse(null)
+    })
+
+    val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
+    val executors = app.executors.values.toSeq
+    val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
+
+    val content =
+        <div class="row-fluid">
+          <div class="span12">
+            <ul class="unstyled">
+              <li><strong>ID:</strong> {app.id}</li>
+              <li><strong>Name:</strong> {app.desc.name}</li>
+              <li><strong>User:</strong> {app.desc.user}</li>
+              <li><strong>Cores:</strong>
+                {
+                if (app.desc.maxCores == Integer.MAX_VALUE) {
+                  "Unlimited (%s granted)".format(app.coresGranted)
+                } else {
+                  "%s (%s granted, %s left)".format(
+                    app.desc.maxCores, app.coresGranted, app.coresLeft)
+                }
+                }
+              </li>
+              <li>
+                <strong>Executor Memory:</strong>
+                {Utils.megabytesToString(app.desc.memoryPerSlave)}
+              </li>
+              <li><strong>Submit Date:</strong> {app.submitDate}</li>
+              <li><strong>State:</strong> {app.state}</li>
+              <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li>
+            </ul>
+          </div>
+        </div>
+
+        <div class="row-fluid"> <!-- Executors -->
+          <div class="span12">
+            <h4> Executor Summary </h4>
+            {executorTable}
+          </div>
+        </div>;
+    UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
+  }
+
+  def executorRow(executor: ExecutorInfo): Seq[Node] = {
+    <tr>
+      <td>{executor.id}</td>
+      <td>
+        <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
+      </td>
+      <td>{executor.cores}</td>
+      <td>{executor.memory}</td>
+      <td>{executor.state}</td>
+      <td>
+        <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
+          .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
+        <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
+          .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
+      </td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
new file mode 100644
index 0000000..58d3863
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+
+import net.liftweb.json.JsonAST.JValue
+
+import org.apache.spark.Utils
+import org.apache.spark.deploy.DeployWebUI
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.ui.UIUtils
+
+private[spark] class IndexPage(parent: MasterWebUI) {
+  val master = parent.masterActorRef
+  implicit val timeout = parent.timeout
+
+  def renderJson(request: HttpServletRequest): JValue = {
+    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+    val state = Await.result(stateFuture, 30 seconds)
+    JsonProtocol.writeMasterState(state)
+  }
+
+  /** Index view listing applications and executors */
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+    val state = Await.result(stateFuture, 30 seconds)
+
+    val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
+    val workers = state.workers.sortBy(_.id)
+    val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
+
+    val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User",
+      "State", "Duration")
+    val activeApps = state.activeApps.sortBy(_.startTime).reverse
+    val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
+    val completedApps = state.completedApps.sortBy(_.endTime).reverse
+    val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
+
+    val content =
+        <div class="row-fluid">
+          <div class="span12">
+            <ul class="unstyled">
+              <li><strong>URL:</strong> {state.uri}</li>
+              <li><strong>Workers:</strong> {state.workers.size}</li>
+              <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
+                {state.workers.map(_.coresUsed).sum} Used</li>
+              <li><strong>Memory:</strong>
+                {Utils.megabytesToString(state.workers.map(_.memory).sum)} Total,
+                {Utils.megabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
+              <li><strong>Applications:</strong>
+                {state.activeApps.size} Running,
+                {state.completedApps.size} Completed </li>
+            </ul>
+          </div>
+        </div>
+
+        <div class="row-fluid">
+          <div class="span12">
+            <h4> Workers </h4>
+            {workerTable}
+          </div>
+        </div>
+
+        <div class="row-fluid">
+          <div class="span12">
+            <h4> Running Applications </h4>
+
+            {activeAppsTable}
+          </div>
+        </div>
+
+        <div class="row-fluid">
+          <div class="span12">
+            <h4> Completed Applications </h4>
+            {completedAppsTable}
+          </div>
+        </div>;
+    UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
+  }
+
+  def workerRow(worker: WorkerInfo): Seq[Node] = {
+    <tr>
+      <td>
+        <a href={worker.webUiAddress}>{worker.id}</a>
+      </td>
+      <td>{worker.host}:{worker.port}</td>
+      <td>{worker.state}</td>
+      <td>{worker.cores} ({worker.coresUsed} Used)</td>
+      <td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}>
+        {Utils.megabytesToString(worker.memory)}
+        ({Utils.megabytesToString(worker.memoryUsed)} Used)
+      </td>
+    </tr>
+  }
+
+
+  def appRow(app: ApplicationInfo): Seq[Node] = {
+    <tr>
+      <td>
+        <a href={"app?appId=" + app.id}>{app.id}</a>
+      </td>
+      <td>
+        <a href={app.appUiUrl}>{app.desc.name}</a>
+      </td>
+      <td>
+        {app.coresGranted}
+      </td>
+      <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
+        {Utils.megabytesToString(app.desc.memoryPerSlave)}
+      </td>
+      <td>{DeployWebUI.formatDate(app.submitDate)}</td>
+      <td>{app.desc.user}</td>
+      <td>{app.state.toString}</td>
+      <td>{DeployWebUI.formatDuration(app.duration)}</td>
+    </tr>
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
new file mode 100644
index 0000000..47b1e52
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master.ui
+
+import akka.util.Duration
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.{Handler, Server}
+
+import org.apache.spark.{Logging, Utils}
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.ui.JettyUtils
+import org.apache.spark.ui.JettyUtils._
+
+/**
+ * Web UI server for the standalone master.
+ */
+private[spark]
+class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
+  implicit val timeout = Duration.create(
+    System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+  val host = Utils.localHostName()
+  val port = requestedPort
+
+  val masterActorRef = master.self
+
+  var server: Option[Server] = None
+  var boundPort: Option[Int] = None
+
+  val applicationPage = new ApplicationPage(this)
+  val indexPage = new IndexPage(this)
+
+  def start() {
+    try {
+      val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+      server = Some(srv)
+      boundPort = Some(bPort)
+      logInfo("Started Master web UI at http://%s:%d".format(host, boundPort.get))
+    } catch {
+      case e: Exception =>
+        logError("Failed to create Master JettyUtils", e)
+        System.exit(1)
+    }
+  }
+
+  val metricsHandlers = master.masterMetricsSystem.getServletHandlers ++
+    master.applicationMetricsSystem.getServletHandlers
+
+  val handlers = metricsHandlers ++ Array[(String, Handler)](
+    ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
+    ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
+    ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
+    ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
+    ("*", (request: HttpServletRequest) => indexPage.render(request))
+  )
+
+  def stop() {
+    server.foreach(_.stop())
+  }
+}
+
+private[spark] object MasterWebUI {
+  val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
new file mode 100644
index 0000000..01ce4a6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import java.io._
+import java.lang.System.getenv
+
+import akka.actor.ActorRef
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+
+import org.apache.spark.{Utils, Logging}
+import org.apache.spark.deploy.{ExecutorState, ApplicationDescription}
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+
+/**
+ * Manages the execution of one executor process.
+ */
+private[spark] class ExecutorRunner(
+    val appId: String,
+    val execId: Int,
+    val appDesc: ApplicationDescription,
+    val cores: Int,
+    val memory: Int,
+    val worker: ActorRef,
+    val workerId: String,
+    val host: String,
+    val sparkHome: File,
+    val workDir: File)
+  extends Logging {
+
+  val fullId = appId + "/" + execId
+  var workerThread: Thread = null
+  var process: Process = null
+  var shutdownHook: Thread = null
+
+  private def getAppEnv(key: String): Option[String] =
+    appDesc.command.environment.get(key).orElse(Option(getenv(key)))
+
+  def start() {
+    workerThread = new Thread("ExecutorRunner for " + fullId) {
+      override def run() { fetchAndRunExecutor() }
+    }
+    workerThread.start()
+
+    // Shutdown hook that kills actors on shutdown.
+    shutdownHook = new Thread() {
+      override def run() {
+        if (process != null) {
+          logInfo("Shutdown hook killing child process.")
+          process.destroy()
+          process.waitFor()
+        }
+      }
+    }
+    Runtime.getRuntime.addShutdownHook(shutdownHook)
+  }
+
+  /** Stop this executor runner, including killing the process it launched */
+  def kill() {
+    if (workerThread != null) {
+      workerThread.interrupt()
+      workerThread = null
+      if (process != null) {
+        logInfo("Killing process!")
+        process.destroy()
+        process.waitFor()
+      }
+      worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
+      Runtime.getRuntime.removeShutdownHook(shutdownHook)
+    }
+  }
+
+  /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
+  def substituteVariables(argument: String): String = argument match {
+    case "{{EXECUTOR_ID}}" => execId.toString
+    case "{{HOSTNAME}}" => host
+    case "{{CORES}}" => cores.toString
+    case other => other
+  }
+
+  def buildCommandSeq(): Seq[String] = {
+    val command = appDesc.command
+    val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
+    // SPARK-698: do not call the run.cmd script, as process.destroy()
+    // fails to kill a process tree on Windows
+    Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
+      command.arguments.map(substituteVariables)
+  }
+
+  /**
+   * Attention: this must always be aligned with the environment variables in the run scripts and
+   * the way the JAVA_OPTS are assembled there.
+   */
+  def buildJavaOpts(): Seq[String] = {
+    val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH")
+      .map(p => List("-Djava.library.path=" + p))
+      .getOrElse(Nil)
+    val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+    val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil)
+    val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
+
+    // Figure out our classpath with the external compute-classpath script
+    val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
+    val classPath = Utils.executeAndGetOutput(
+        Seq(sparkHome + "/bin/compute-classpath" + ext),
+        extraEnvironment=appDesc.command.environment)
+
+    Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
+  }
+
+  /** Spawn a thread that will redirect a given stream to a file */
+  def redirectStream(in: InputStream, file: File) {
+    val out = new FileOutputStream(file, true)
+    new Thread("redirect output to " + file) {
+      override def run() {
+        try {
+          Utils.copyStream(in, out, true)
+        } catch {
+          case e: IOException =>
+            logInfo("Redirection to " + file + " closed: " + e.getMessage)
+        }
+      }
+    }.start()
+  }
+
+  /**
+   * Download and run the executor described in our ApplicationDescription
+   */
+  def fetchAndRunExecutor() {
+    try {
+      // Create the executor's working directory
+      val executorDir = new File(workDir, appId + "/" + execId)
+      if (!executorDir.mkdirs()) {
+        throw new IOException("Failed to create directory " + executorDir)
+      }
+
+      // Launch the process
+      val command = buildCommandSeq()
+      logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
+      val builder = new ProcessBuilder(command: _*).directory(executorDir)
+      val env = builder.environment()
+      for ((key, value) <- appDesc.command.environment) {
+        env.put(key, value)
+      }
+      // In case we are running this from within the Spark Shell, avoid creating a "scala"
+      // parent process for the executor command
+      env.put("SPARK_LAUNCH_WITH_SCALA", "0")
+      process = builder.start()
+
+      val header = "Spark Executor Command: %s\n%s\n\n".format(
+        command.mkString("\"", "\" \"", "\""), "=" * 40)
+
+      // Redirect its stdout and stderr to files
+      val stdout = new File(executorDir, "stdout")
+      redirectStream(process.getInputStream, stdout)
+
+      val stderr = new File(executorDir, "stderr")
+      Files.write(header, stderr, Charsets.UTF_8)
+      redirectStream(process.getErrorStream, stderr)
+
+      // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
+      // long-lived processes only. However, in the future, we might restart the executor a few
+      // times on the same machine.
+      val exitCode = process.waitFor()
+      val message = "Command exited with code " + exitCode
+      worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
+                                    Some(exitCode))
+    } catch {
+      case interrupted: InterruptedException =>
+        logInfo("Runner thread for executor " + fullId + " interrupted")
+
+      case e: Exception => {
+        logError("Error running executor", e)
+        if (process != null) {
+          process.destroy()
+        }
+        val message = e.getClass + ": " + e.getMessage
+        worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
+      }
+    }
+  }
+}