You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:24 UTC
[40/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/Command.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala
new file mode 100644
index 0000000..fa8af9a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.Map
+
+private[spark] case class Command(
+ mainClass: String,
+ arguments: Seq[String],
+ environment: Map[String, String]) {
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
new file mode 100644
index 0000000..4dc6ada
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.immutable.List
+
+import org.apache.spark.Utils
+import org.apache.spark.deploy.ExecutorState.ExecutorState
+import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
+import org.apache.spark.deploy.worker.ExecutorRunner
+
+
+private[deploy] sealed trait DeployMessage extends Serializable
+
+private[deploy] object DeployMessages {
+
+ // Worker to Master
+
+ case class RegisterWorker(
+ id: String,
+ host: String,
+ port: Int,
+ cores: Int,
+ memory: Int,
+ webUiPort: Int,
+ publicAddress: String)
+ extends DeployMessage {
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+ }
+
+ case class ExecutorStateChanged(
+ appId: String,
+ execId: Int,
+ state: ExecutorState,
+ message: Option[String],
+ exitStatus: Option[Int])
+ extends DeployMessage
+
+ case class Heartbeat(workerId: String) extends DeployMessage
+
+ // Master to Worker
+
+ case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
+
+ case class RegisterWorkerFailed(message: String) extends DeployMessage
+
+ case class KillExecutor(appId: String, execId: Int) extends DeployMessage
+
+ case class LaunchExecutor(
+ appId: String,
+ execId: Int,
+ appDesc: ApplicationDescription,
+ cores: Int,
+ memory: Int,
+ sparkHome: String)
+ extends DeployMessage
+
+ // Client to Master
+
+ case class RegisterApplication(appDescription: ApplicationDescription)
+ extends DeployMessage
+
+ // Master to Client
+
+ case class RegisteredApplication(appId: String) extends DeployMessage
+
+ // TODO(matei): replace hostPort with host
+ case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
+ Utils.checkHostPort(hostPort, "Required hostport")
+ }
+
+ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
+ exitStatus: Option[Int])
+
+ case class ApplicationRemoved(message: String)
+
+ // Internal message in Client
+
+ case object StopClient
+
+ // MasterWebUI To Master
+
+ case object RequestMasterState
+
+ // Master to MasterWebUI
+
+ case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
+ activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+
+ def uri = "spark://" + host + ":" + port
+ }
+
+ // WorkerWebUI to Worker
+
+ case object RequestWorkerState
+
+ // Worker to WorkerWebUI
+
+ case class WorkerStateResponse(host: String, port: Int, workerId: String,
+ executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
+ cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
+
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+ }
+
+ // Actor System to Master
+
+ case object CheckForWorkerTimeOut
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
new file mode 100644
index 0000000..fcfea96
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+private[spark] object ExecutorState
+ extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
+
+ val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
+
+ type ExecutorState = Value
+
+ def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
new file mode 100644
index 0000000..a6be8ef
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import net.liftweb.json.JsonDSL._
+
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
+import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.worker.ExecutorRunner
+
+
+private[spark] object JsonProtocol {
+ def writeWorkerInfo(obj: WorkerInfo) = {
+ ("id" -> obj.id) ~
+ ("host" -> obj.host) ~
+ ("port" -> obj.port) ~
+ ("webuiaddress" -> obj.webUiAddress) ~
+ ("cores" -> obj.cores) ~
+ ("coresused" -> obj.coresUsed) ~
+ ("memory" -> obj.memory) ~
+ ("memoryused" -> obj.memoryUsed) ~
+ ("state" -> obj.state.toString)
+ }
+
+ def writeApplicationInfo(obj: ApplicationInfo) = {
+ ("starttime" -> obj.startTime) ~
+ ("id" -> obj.id) ~
+ ("name" -> obj.desc.name) ~
+ ("cores" -> obj.desc.maxCores) ~
+ ("user" -> obj.desc.user) ~
+ ("memoryperslave" -> obj.desc.memoryPerSlave) ~
+ ("submitdate" -> obj.submitDate.toString)
+ }
+
+ def writeApplicationDescription(obj: ApplicationDescription) = {
+ ("name" -> obj.name) ~
+ ("cores" -> obj.maxCores) ~
+ ("memoryperslave" -> obj.memoryPerSlave) ~
+ ("user" -> obj.user)
+ }
+
+ def writeExecutorRunner(obj: ExecutorRunner) = {
+ ("id" -> obj.execId) ~
+ ("memory" -> obj.memory) ~
+ ("appid" -> obj.appId) ~
+ ("appdesc" -> writeApplicationDescription(obj.appDesc))
+ }
+
+ def writeMasterState(obj: MasterStateResponse) = {
+ ("url" -> ("spark://" + obj.uri)) ~
+ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
+ ("cores" -> obj.workers.map(_.cores).sum) ~
+ ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
+ ("memory" -> obj.workers.map(_.memory).sum) ~
+ ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
+ ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
+ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
+ }
+
+ def writeWorkerState(obj: WorkerStateResponse) = {
+ ("id" -> obj.workerId) ~
+ ("masterurl" -> obj.masterUrl) ~
+ ("masterwebuiurl" -> obj.masterWebUiUrl) ~
+ ("cores" -> obj.cores) ~
+ ("coresused" -> obj.coresUsed) ~
+ ("memory" -> obj.memory) ~
+ ("memoryused" -> obj.memoryUsed) ~
+ ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
+ ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
new file mode 100644
index 0000000..af5a411
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+
+import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.util.AkkaUtils
+import org.apache.spark.{Logging, Utils}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Testing class that creates a Spark standalone process in-cluster (that is, running the
+ * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
+ * by the Workers still run in separate JVMs. This can be used to test distributed operation and
+ * fault recovery without spinning up a lot of processes.
+ */
+private[spark]
+class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
+
+ private val localHostname = Utils.localHostName()
+ private val masterActorSystems = ArrayBuffer[ActorSystem]()
+ private val workerActorSystems = ArrayBuffer[ActorSystem]()
+
+ def start(): String = {
+ logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
+
+ /* Start the Master */
+ val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0)
+ masterActorSystems += masterSystem
+ val masterUrl = "spark://" + localHostname + ":" + masterPort
+
+ /* Start the Workers */
+ for (workerNum <- 1 to numWorkers) {
+ val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
+ memoryPerWorker, masterUrl, null, Some(workerNum))
+ workerActorSystems += workerSystem
+ }
+
+ return masterUrl
+ }
+
+ def stop() {
+ logInfo("Shutting down local Spark cluster.")
+ // Stop the workers before the master so they don't get upset that it disconnected
+ workerActorSystems.foreach(_.shutdown())
+ workerActorSystems.foreach(_.awaitTermination())
+
+ masterActorSystems.foreach(_.shutdown())
+ masterActorSystems.foreach(_.awaitTermination())
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
new file mode 100644
index 0000000..0a5f4c3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+
+
+/**
+ * Contains util methods to interact with Hadoop from spark.
+ */
+class SparkHadoopUtil {
+
+ // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+ def newConfiguration(): Configuration = new Configuration()
+
+ // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ def addCredentials(conf: JobConf) {}
+
+ def isYarnMode(): Boolean = { false }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
new file mode 100644
index 0000000..ae258b5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/WebUI.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+/**
+ * Utilities used throughout the web UI.
+ */
+private[spark] object DeployWebUI {
+ val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+
+ def formatDate(date: Date): String = DATE_FORMAT.format(date)
+
+ def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
+
+ def formatDuration(milliseconds: Long): String = {
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return "%.0f s".format(seconds)
+ }
+ val minutes = seconds / 60
+ if (minutes < 10) {
+ return "%.1f min".format(minutes)
+ } else if (minutes < 60) {
+ return "%.0f min".format(minutes)
+ }
+ val hours = minutes / 60
+ return "%.1f h".format(hours)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
new file mode 100644
index 0000000..a342dd7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import java.util.concurrent.TimeoutException
+
+import akka.actor._
+import akka.actor.Terminated
+import akka.pattern.ask
+import akka.util.Duration
+import akka.remote.RemoteClientDisconnected
+import akka.remote.RemoteClientLifeCycleEvent
+import akka.remote.RemoteClientShutdown
+import akka.dispatch.Await
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.Master
+
+
+/**
+ * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
+ * and a listener for cluster events, and calls back the listener when various events occur.
+ */
+private[spark] class Client(
+ actorSystem: ActorSystem,
+ masterUrl: String,
+ appDescription: ApplicationDescription,
+ listener: ClientListener)
+ extends Logging {
+
+ var actor: ActorRef = null
+ var appId: String = null
+
+ class ClientActor extends Actor with Logging {
+ var master: ActorRef = null
+ var masterAddress: Address = null
+ var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
+
+ override def preStart() {
+ logInfo("Connecting to master " + masterUrl)
+ try {
+ master = context.actorFor(Master.toAkkaUrl(masterUrl))
+ masterAddress = master.path.address
+ master ! RegisterApplication(appDescription)
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to master", e)
+ markDisconnected()
+ context.stop(self)
+ }
+ }
+
+ override def receive = {
+ case RegisteredApplication(appId_) =>
+ appId = appId_
+ listener.connected(appId)
+
+ case ApplicationRemoved(message) =>
+ logError("Master removed our application: %s; stopping client".format(message))
+ markDisconnected()
+ context.stop(self)
+
+ case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
+ val fullId = appId + "/" + id
+ logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
+ listener.executorAdded(fullId, workerId, hostPort, cores, memory)
+
+ case ExecutorUpdated(id, state, message, exitStatus) =>
+ val fullId = appId + "/" + id
+ val messageText = message.map(s => " (" + s + ")").getOrElse("")
+ logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
+ if (ExecutorState.isFinished(state)) {
+ listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
+ }
+
+ case Terminated(actor_) if actor_ == master =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientDisconnected(transport, address) if address == masterAddress =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientShutdown(transport, address) if address == masterAddress =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case StopClient =>
+ markDisconnected()
+ sender ! true
+ context.stop(self)
+ }
+
+ /**
+ * Notify the listener that we disconnected, if we hadn't already done so before.
+ */
+ def markDisconnected() {
+ if (!alreadyDisconnected) {
+ listener.disconnected()
+ alreadyDisconnected = true
+ }
+ }
+ }
+
+ def start() {
+ // Just launch an actor; it will call back into the listener.
+ actor = actorSystem.actorOf(Props(new ClientActor))
+ }
+
+ def stop() {
+ if (actor != null) {
+ try {
+ val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val future = actor.ask(StopClient)(timeout)
+ Await.result(future, timeout)
+ } catch {
+ case e: TimeoutException =>
+ logInfo("Stop request to Master timed out; it may already be shut down.")
+ }
+ actor = null
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
new file mode 100644
index 0000000..4605368
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+/**
+ * Callbacks invoked by deploy client when various events happen. There are currently four events:
+ * connecting to the cluster, disconnecting, being given an executor, and having an executor
+ * removed (either due to failure or due to revocation).
+ *
+ * Users of this API should *not* block inside the callback methods.
+ */
+private[spark] trait ClientListener {
+ def connected(appId: String): Unit
+
+ def disconnected(): Unit
+
+ def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
+
+ def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
new file mode 100644
index 0000000..0322029
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import org.apache.spark.util.AkkaUtils
+import org.apache.spark.{Logging, Utils}
+import org.apache.spark.deploy.{Command, ApplicationDescription}
+
+private[spark] object TestClient {
+
+ class TestListener extends ClientListener with Logging {
+ def connected(id: String) {
+ logInfo("Connected to master, got app ID " + id)
+ }
+
+ def disconnected() {
+ logInfo("Disconnected from master")
+ System.exit(0)
+ }
+
+ def executorAdded(id: String, workerId: String, hostPort: String, cores: Int, memory: Int) {}
+
+ def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
+ }
+
+ def main(args: Array[String]) {
+ val url = args(0)
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
+ val desc = new ApplicationDescription(
+ "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
+ val listener = new TestListener
+ val client = new Client(actorSystem, url, desc, listener)
+ client.start()
+ actorSystem.awaitTermination()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/client/TestExecutor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestExecutor.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestExecutor.scala
new file mode 100644
index 0000000..c5ac45c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestExecutor.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+private[spark] object TestExecutor {
+ def main(args: Array[String]) {
+ println("Hello world!")
+ while (true) {
+ Thread.sleep(1000)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
new file mode 100644
index 0000000..bd53276
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.deploy.ApplicationDescription
+import java.util.Date
+import akka.actor.ActorRef
+import scala.collection.mutable
+
+private[spark] class ApplicationInfo(
+ val startTime: Long,
+ val id: String,
+ val desc: ApplicationDescription,
+ val submitDate: Date,
+ val driver: ActorRef,
+ val appUiUrl: String)
+{
+ var state = ApplicationState.WAITING
+ var executors = new mutable.HashMap[Int, ExecutorInfo]
+ var coresGranted = 0
+ var endTime = -1L
+ val appSource = new ApplicationSource(this)
+
+ private var nextExecutorId = 0
+
+ def newExecutorId(): Int = {
+ val id = nextExecutorId
+ nextExecutorId += 1
+ id
+ }
+
+ def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
+ val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
+ executors(exec.id) = exec
+ coresGranted += cores
+ exec
+ }
+
+ def removeExecutor(exec: ExecutorInfo) {
+ if (executors.contains(exec.id)) {
+ executors -= exec.id
+ coresGranted -= exec.cores
+ }
+ }
+
+ def coresLeft: Int = desc.maxCores - coresGranted
+
+ private var _retryCount = 0
+
+ def retryCount = _retryCount
+
+ def incrementRetryCount = {
+ _retryCount += 1
+ _retryCount
+ }
+
+ def markFinished(endState: ApplicationState.Value) {
+ state = endState
+ endTime = System.currentTimeMillis()
+ }
+
+ def duration: Long = {
+ if (endTime != -1) {
+ endTime - startTime
+ } else {
+ System.currentTimeMillis() - startTime
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
new file mode 100644
index 0000000..2d75ad5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
@@ -0,0 +1,24 @@
+package org.apache.spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+class ApplicationSource(val application: ApplicationInfo) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "%s.%s.%s".format("application", application.desc.name,
+ System.currentTimeMillis())
+
+ metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
+ override def getValue: String = application.state.toString
+ })
+
+ metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
+ override def getValue: Long = application.duration
+ })
+
+ metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
+ override def getValue: Int = application.coresGranted
+ })
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
new file mode 100644
index 0000000..7e80422
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+private[spark] object ApplicationState
+ extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
+
+ type ApplicationState = Value
+
+ val WAITING, RUNNING, FINISHED, FAILED = Value
+
+ val MAX_NUM_RETRY = 10
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
new file mode 100644
index 0000000..cf384a9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.deploy.ExecutorState
+
+private[spark] class ExecutorInfo(
+ val id: Int,
+ val application: ApplicationInfo,
+ val worker: WorkerInfo,
+ val cores: Int,
+ val memory: Int) {
+
+ var state = ExecutorState.LAUNCHING
+
+ def fullId: String = application.id + "/" + id
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
new file mode 100644
index 0000000..869b2b2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
+import akka.actor._
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.util.duration._
+
+import org.apache.spark.{Logging, SparkException, Utils}
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.ui.MasterWebUI
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.util.AkkaUtils
+
+
+private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
+ val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
+ val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
+ val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
+ val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
+
+ var nextAppNumber = 0
+ val workers = new HashSet[WorkerInfo]
+ val idToWorker = new HashMap[String, WorkerInfo]
+ val actorToWorker = new HashMap[ActorRef, WorkerInfo]
+ val addressToWorker = new HashMap[Address, WorkerInfo]
+
+ val apps = new HashSet[ApplicationInfo]
+ val idToApp = new HashMap[String, ApplicationInfo]
+ val actorToApp = new HashMap[ActorRef, ApplicationInfo]
+ val addressToApp = new HashMap[Address, ApplicationInfo]
+
+ val waitingApps = new ArrayBuffer[ApplicationInfo]
+ val completedApps = new ArrayBuffer[ApplicationInfo]
+
+ var firstApp: Option[ApplicationInfo] = None
+
+ Utils.checkHost(host, "Expected hostname")
+
+ val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
+ val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
+ val masterSource = new MasterSource(this)
+
+ val webUi = new MasterWebUI(this, webUiPort)
+
+ val masterPublicAddress = {
+ val envVar = System.getenv("SPARK_PUBLIC_DNS")
+ if (envVar != null) envVar else host
+ }
+
+ // As a temporary workaround before better ways of configuring memory, we allow users to set
+ // a flag that will perform round-robin scheduling across the nodes (spreading out each app
+ // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
+ val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
+
+ override def preStart() {
+ logInfo("Starting Spark master at spark://" + host + ":" + port)
+ // Listen for remote client disconnection events, since they don't go through Akka's watch()
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ webUi.start()
+ context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
+
+ masterMetricsSystem.registerSource(masterSource)
+ masterMetricsSystem.start()
+ applicationMetricsSystem.start()
+ }
+
+ override def postStop() {
+ webUi.stop()
+ masterMetricsSystem.stop()
+ applicationMetricsSystem.stop()
+ }
+
+ override def receive = {
+ case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
+ logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
+ host, workerPort, cores, Utils.megabytesToString(memory)))
+ if (idToWorker.contains(id)) {
+ sender ! RegisterWorkerFailed("Duplicate worker ID")
+ } else {
+ addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
+ context.watch(sender) // This doesn't work with remote actors but helps for testing
+ sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
+ schedule()
+ }
+ }
+
+ case RegisterApplication(description) => {
+ logInfo("Registering app " + description.name)
+ val app = addApplication(description, sender)
+ logInfo("Registered app " + description.name + " with ID " + app.id)
+ waitingApps += app
+ context.watch(sender) // This doesn't work with remote actors but helps for testing
+ sender ! RegisteredApplication(app.id)
+ schedule()
+ }
+
+ case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
+ val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
+ execOption match {
+ case Some(exec) => {
+ exec.state = state
+ exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
+ if (ExecutorState.isFinished(state)) {
+ val appInfo = idToApp(appId)
+ // Remove this executor from the worker and app
+ logInfo("Removing executor " + exec.fullId + " because it is " + state)
+ appInfo.removeExecutor(exec)
+ exec.worker.removeExecutor(exec)
+
+ // Only retry certain number of times so we don't go into an infinite loop.
+ if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
+ schedule()
+ } else {
+ logError("Application %s with ID %s failed %d times, removing it".format(
+ appInfo.desc.name, appInfo.id, appInfo.retryCount))
+ removeApplication(appInfo, ApplicationState.FAILED)
+ }
+ }
+ }
+ case None =>
+ logWarning("Got status update for unknown executor " + appId + "/" + execId)
+ }
+ }
+
+ case Heartbeat(workerId) => {
+ idToWorker.get(workerId) match {
+ case Some(workerInfo) =>
+ workerInfo.lastHeartbeat = System.currentTimeMillis()
+ case None =>
+ logWarning("Got heartbeat from unregistered worker " + workerId)
+ }
+ }
+
+ case Terminated(actor) => {
+ // The disconnected actor could've been either a worker or an app; remove whichever of
+ // those we have an entry for in the corresponding actor hashmap
+ actorToWorker.get(actor).foreach(removeWorker)
+ actorToApp.get(actor).foreach(finishApplication)
+ }
+
+ case RemoteClientDisconnected(transport, address) => {
+ // The disconnected client could've been either a worker or an app; remove whichever it was
+ addressToWorker.get(address).foreach(removeWorker)
+ addressToApp.get(address).foreach(finishApplication)
+ }
+
+ case RemoteClientShutdown(transport, address) => {
+ // The disconnected client could've been either a worker or an app; remove whichever it was
+ addressToWorker.get(address).foreach(removeWorker)
+ addressToApp.get(address).foreach(finishApplication)
+ }
+
+ case RequestMasterState => {
+ sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+ }
+
+ case CheckForWorkerTimeOut => {
+ timeOutDeadWorkers()
+ }
+ }
+
+ /**
+ * Can an app use the given worker? True if the worker has enough memory and we haven't already
+ * launched an executor for the app on it (right now the standalone backend doesn't like having
+ * two executors on the same worker).
+ */
+ def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
+ worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
+ }
+
+ /**
+ * Schedule the currently available resources among waiting apps. This method will be called
+ * every time a new app joins or resource availability changes.
+ */
+ def schedule() {
+ // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
+ // in the queue, then the second app, etc.
+ if (spreadOutApps) {
+ // Try to spread out each app among all the nodes, until it has all its cores
+ for (app <- waitingApps if app.coresLeft > 0) {
+ val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
+ .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+ val numUsable = usableWorkers.length
+ val assigned = new Array[Int](numUsable) // Number of cores to give on each node
+ var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
+ var pos = 0
+ while (toAssign > 0) {
+ if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
+ toAssign -= 1
+ assigned(pos) += 1
+ }
+ pos = (pos + 1) % numUsable
+ }
+ // Now that we've decided how many cores to give on each node, let's actually give them
+ for (pos <- 0 until numUsable) {
+ if (assigned(pos) > 0) {
+ val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
+ launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome)
+ app.state = ApplicationState.RUNNING
+ }
+ }
+ }
+ } else {
+ // Pack each app into as few nodes as possible until we've assigned all its cores
+ for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
+ for (app <- waitingApps if app.coresLeft > 0) {
+ if (canUse(app, worker)) {
+ val coresToUse = math.min(worker.coresFree, app.coresLeft)
+ if (coresToUse > 0) {
+ val exec = app.addExecutor(worker, coresToUse)
+ launchExecutor(worker, exec, app.desc.sparkHome)
+ app.state = ApplicationState.RUNNING
+ }
+ }
+ }
+ }
+ }
+ }
+
+ def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
+ logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
+ worker.addExecutor(exec)
+ worker.actor ! LaunchExecutor(
+ exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
+ exec.application.driver ! ExecutorAdded(
+ exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
+ }
+
+ def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
+ publicAddress: String): WorkerInfo = {
+ // There may be one or more refs to dead workers on this same node (w/ different ID's),
+ // remove them.
+ workers.filter { w =>
+ (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)
+ }.foreach { w =>
+ workers -= w
+ }
+ val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+ workers += worker
+ idToWorker(worker.id) = worker
+ actorToWorker(sender) = worker
+ addressToWorker(sender.path.address) = worker
+ worker
+ }
+
+ def removeWorker(worker: WorkerInfo) {
+ logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
+ worker.setState(WorkerState.DEAD)
+ idToWorker -= worker.id
+ actorToWorker -= worker.actor
+ addressToWorker -= worker.actor.path.address
+ for (exec <- worker.executors.values) {
+ logInfo("Telling app of lost executor: " + exec.id)
+ exec.application.driver ! ExecutorUpdated(
+ exec.id, ExecutorState.LOST, Some("worker lost"), None)
+ exec.application.removeExecutor(exec)
+ }
+ }
+
+ def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
+ val now = System.currentTimeMillis()
+ val date = new Date(now)
+ val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ applicationMetricsSystem.registerSource(app.appSource)
+ apps += app
+ idToApp(app.id) = app
+ actorToApp(driver) = app
+ addressToApp(driver.path.address) = app
+ if (firstApp == None) {
+ firstApp = Some(app)
+ }
+ val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
+ if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
+ logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
+ }
+ app
+ }
+
+ def finishApplication(app: ApplicationInfo) {
+ removeApplication(app, ApplicationState.FINISHED)
+ }
+
+ def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
+ if (apps.contains(app)) {
+ logInfo("Removing app " + app.id)
+ apps -= app
+ idToApp -= app.id
+ actorToApp -= app.driver
+ addressToApp -= app.driver.path.address
+ if (completedApps.size >= RETAINED_APPLICATIONS) {
+ val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
+ completedApps.take(toRemove).foreach( a => {
+ applicationMetricsSystem.removeSource(a.appSource)
+ })
+ completedApps.trimStart(toRemove)
+ }
+ completedApps += app // Remember it in our history
+ waitingApps -= app
+ for (exec <- app.executors.values) {
+ exec.worker.removeExecutor(exec)
+ exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
+ exec.state = ExecutorState.KILLED
+ }
+ app.markFinished(state)
+ if (state != ApplicationState.FINISHED) {
+ app.driver ! ApplicationRemoved(state.toString)
+ }
+ schedule()
+ }
+ }
+
+ /** Generate a new app ID given a app's submission date */
+ def newApplicationId(submitDate: Date): String = {
+ val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
+ nextAppNumber += 1
+ appId
+ }
+
+ /** Check for, and remove, any timed-out workers */
+ def timeOutDeadWorkers() {
+ // Copy the workers into an array so we don't modify the hashset while iterating through it
+ val currentTime = System.currentTimeMillis()
+ val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
+ for (worker <- toRemove) {
+ if (worker.state != WorkerState.DEAD) {
+ logWarning("Removing %s because we got no heartbeat in %d seconds".format(
+ worker.id, WORKER_TIMEOUT/1000))
+ removeWorker(worker)
+ } else {
+ if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
+ workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
+ }
+ }
+ }
+}
+
+private[spark] object Master {
+ private val systemName = "sparkMaster"
+ private val actorName = "Master"
+ private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
+
+ def main(argStrings: Array[String]) {
+ val args = new MasterArguments(argStrings)
+ val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+ actorSystem.awaitTermination()
+ }
+
+ /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+ def toAkkaUrl(sparkUrl: String): String = {
+ sparkUrl match {
+ case sparkUrlRegex(host, port) =>
+ "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+ case _ =>
+ throw new SparkException("Invalid master URL: " + sparkUrl)
+ }
+ }
+
+ def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
+ (actorSystem, boundPort)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
new file mode 100644
index 0000000..c86cca2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.util.IntParam
+import org.apache.spark.Utils
+
+/**
+ * Command-line parser for the master.
+ */
+private[spark] class MasterArguments(args: Array[String]) {
+ var host = Utils.localHostName()
+ var port = 7077
+ var webUiPort = 8080
+
+ // Check for settings in environment variables
+ if (System.getenv("SPARK_MASTER_HOST") != null) {
+ host = System.getenv("SPARK_MASTER_HOST")
+ }
+ if (System.getenv("SPARK_MASTER_PORT") != null) {
+ port = System.getenv("SPARK_MASTER_PORT").toInt
+ }
+ if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
+ webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
+ }
+ if (System.getProperty("master.ui.port") != null) {
+ webUiPort = System.getProperty("master.ui.port").toInt
+ }
+
+ parse(args.toList)
+
+ def parse(args: List[String]): Unit = args match {
+ case ("--ip" | "-i") :: value :: tail =>
+ Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
+ host = value
+ parse(tail)
+
+ case ("--host" | "-h") :: value :: tail =>
+ Utils.checkHost(value, "Please use hostname " + value)
+ host = value
+ parse(tail)
+
+ case ("--port" | "-p") :: IntParam(value) :: tail =>
+ port = value
+ parse(tail)
+
+ case "--webui-port" :: IntParam(value) :: tail =>
+ webUiPort = value
+ parse(tail)
+
+ case ("--help" | "-h") :: tail =>
+ printUsageAndExit(0)
+
+ case Nil => {}
+
+ case _ =>
+ printUsageAndExit(1)
+ }
+
+ /**
+ * Print usage and exit JVM with the given exit code.
+ */
+ def printUsageAndExit(exitCode: Int) {
+ System.err.println(
+ "Usage: Master [options]\n" +
+ "\n" +
+ "Options:\n" +
+ " -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
+ " -h HOST, --host HOST Hostname to listen on\n" +
+ " -p PORT, --port PORT Port to listen on (default: 7077)\n" +
+ " --webui-port PORT Port for web UI (default: 8080)")
+ System.exit(exitCode)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
new file mode 100644
index 0000000..8dd0a42
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -0,0 +1,25 @@
+package org.apache.spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class MasterSource(val master: Master) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "master"
+
+ // Gauge for worker numbers in cluster
+ metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
+ override def getValue: Int = master.workers.size
+ })
+
+ // Gauge for application numbers in cluster
+ metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
+ override def getValue: Int = master.apps.size
+ })
+
+ // Gauge for waiting application numbers in cluster
+ metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
+ override def getValue: Int = master.waitingApps.size
+ })
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
new file mode 100644
index 0000000..285e07a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import akka.actor.ActorRef
+import scala.collection.mutable
+import org.apache.spark.Utils
+
+private[spark] class WorkerInfo(
+ val id: String,
+ val host: String,
+ val port: Int,
+ val cores: Int,
+ val memory: Int,
+ val actor: ActorRef,
+ val webUiPort: Int,
+ val publicAddress: String) {
+
+ Utils.checkHost(host, "Expected hostname")
+ assert (port > 0)
+
+ var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
+ var state: WorkerState.Value = WorkerState.ALIVE
+ var coresUsed = 0
+ var memoryUsed = 0
+
+ var lastHeartbeat = System.currentTimeMillis()
+
+ def coresFree: Int = cores - coresUsed
+ def memoryFree: Int = memory - memoryUsed
+
+ def hostPort: String = {
+ assert (port > 0)
+ host + ":" + port
+ }
+
+ def addExecutor(exec: ExecutorInfo) {
+ executors(exec.fullId) = exec
+ coresUsed += exec.cores
+ memoryUsed += exec.memory
+ }
+
+ def removeExecutor(exec: ExecutorInfo) {
+ if (executors.contains(exec.fullId)) {
+ executors -= exec.fullId
+ coresUsed -= exec.cores
+ memoryUsed -= exec.memory
+ }
+ }
+
+ def hasExecutor(app: ApplicationInfo): Boolean = {
+ executors.values.exists(_.application == app)
+ }
+
+ def webUiAddress : String = {
+ "http://" + this.publicAddress + ":" + this.webUiPort
+ }
+
+ def setState(state: WorkerState.Value) = {
+ this.state = state
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
new file mode 100644
index 0000000..b5ee6dc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
+ type WorkerState = Value
+
+ val ALIVE, DEAD, DECOMMISSIONED = Value
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
new file mode 100644
index 0000000..6435c7f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master.ui
+
+import scala.xml.Node
+
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+
+import javax.servlet.http.HttpServletRequest
+
+import net.liftweb.json.JsonAST.JValue
+
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.master.ExecutorInfo
+import org.apache.spark.ui.UIUtils
+import org.apache.spark.Utils
+
+private[spark] class ApplicationPage(parent: MasterWebUI) {
+ val master = parent.masterActorRef
+ implicit val timeout = parent.timeout
+
+ /** Executor details for a particular application */
+ def renderJson(request: HttpServletRequest): JValue = {
+ val appId = request.getParameter("appId")
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+ val state = Await.result(stateFuture, 30 seconds)
+ val app = state.activeApps.find(_.id == appId).getOrElse({
+ state.completedApps.find(_.id == appId).getOrElse(null)
+ })
+ JsonProtocol.writeApplicationInfo(app)
+ }
+
+ /** Executor details for a particular application */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val appId = request.getParameter("appId")
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+ val state = Await.result(stateFuture, 30 seconds)
+ val app = state.activeApps.find(_.id == appId).getOrElse({
+ state.completedApps.find(_.id == appId).getOrElse(null)
+ })
+
+ val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
+ val executors = app.executors.values.toSeq
+ val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
+
+ val content =
+ <div class="row-fluid">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>ID:</strong> {app.id}</li>
+ <li><strong>Name:</strong> {app.desc.name}</li>
+ <li><strong>User:</strong> {app.desc.user}</li>
+ <li><strong>Cores:</strong>
+ {
+ if (app.desc.maxCores == Integer.MAX_VALUE) {
+ "Unlimited (%s granted)".format(app.coresGranted)
+ } else {
+ "%s (%s granted, %s left)".format(
+ app.desc.maxCores, app.coresGranted, app.coresLeft)
+ }
+ }
+ </li>
+ <li>
+ <strong>Executor Memory:</strong>
+ {Utils.megabytesToString(app.desc.memoryPerSlave)}
+ </li>
+ <li><strong>Submit Date:</strong> {app.submitDate}</li>
+ <li><strong>State:</strong> {app.state}</li>
+ <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li>
+ </ul>
+ </div>
+ </div>
+
+ <div class="row-fluid"> <!-- Executors -->
+ <div class="span12">
+ <h4> Executor Summary </h4>
+ {executorTable}
+ </div>
+ </div>;
+ UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
+ }
+
+ def executorRow(executor: ExecutorInfo): Seq[Node] = {
+ <tr>
+ <td>{executor.id}</td>
+ <td>
+ <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
+ </td>
+ <td>{executor.cores}</td>
+ <td>{executor.memory}</td>
+ <td>{executor.state}</td>
+ <td>
+ <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
+ .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
+ <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
+ .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
+ </td>
+ </tr>
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
new file mode 100644
index 0000000..58d3863
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+
+import net.liftweb.json.JsonAST.JValue
+
+import org.apache.spark.Utils
+import org.apache.spark.deploy.DeployWebUI
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.ui.UIUtils
+
+private[spark] class IndexPage(parent: MasterWebUI) {
+ val master = parent.masterActorRef
+ implicit val timeout = parent.timeout
+
+ def renderJson(request: HttpServletRequest): JValue = {
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+ val state = Await.result(stateFuture, 30 seconds)
+ JsonProtocol.writeMasterState(state)
+ }
+
+ /** Index view listing applications and executors */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+ val state = Await.result(stateFuture, 30 seconds)
+
+ val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
+ val workers = state.workers.sortBy(_.id)
+ val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
+
+ val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User",
+ "State", "Duration")
+ val activeApps = state.activeApps.sortBy(_.startTime).reverse
+ val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
+ val completedApps = state.completedApps.sortBy(_.endTime).reverse
+ val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
+
+ val content =
+ <div class="row-fluid">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>URL:</strong> {state.uri}</li>
+ <li><strong>Workers:</strong> {state.workers.size}</li>
+ <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
+ {state.workers.map(_.coresUsed).sum} Used</li>
+ <li><strong>Memory:</strong>
+ {Utils.megabytesToString(state.workers.map(_.memory).sum)} Total,
+ {Utils.megabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
+ <li><strong>Applications:</strong>
+ {state.activeApps.size} Running,
+ {state.completedApps.size} Completed </li>
+ </ul>
+ </div>
+ </div>
+
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Workers </h4>
+ {workerTable}
+ </div>
+ </div>
+
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Running Applications </h4>
+
+ {activeAppsTable}
+ </div>
+ </div>
+
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Completed Applications </h4>
+ {completedAppsTable}
+ </div>
+ </div>;
+ UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
+ }
+
+ def workerRow(worker: WorkerInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={worker.webUiAddress}>{worker.id}</a>
+ </td>
+ <td>{worker.host}:{worker.port}</td>
+ <td>{worker.state}</td>
+ <td>{worker.cores} ({worker.coresUsed} Used)</td>
+ <td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}>
+ {Utils.megabytesToString(worker.memory)}
+ ({Utils.megabytesToString(worker.memoryUsed)} Used)
+ </td>
+ </tr>
+ }
+
+
+ def appRow(app: ApplicationInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={"app?appId=" + app.id}>{app.id}</a>
+ </td>
+ <td>
+ <a href={app.appUiUrl}>{app.desc.name}</a>
+ </td>
+ <td>
+ {app.coresGranted}
+ </td>
+ <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
+ {Utils.megabytesToString(app.desc.memoryPerSlave)}
+ </td>
+ <td>{DeployWebUI.formatDate(app.submitDate)}</td>
+ <td>{app.desc.user}</td>
+ <td>{app.state.toString}</td>
+ <td>{DeployWebUI.formatDuration(app.duration)}</td>
+ </tr>
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
new file mode 100644
index 0000000..47b1e52
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master.ui
+
+import akka.util.Duration
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.{Handler, Server}
+
+import org.apache.spark.{Logging, Utils}
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.ui.JettyUtils
+import org.apache.spark.ui.JettyUtils._
+
+/**
+ * Web UI server for the standalone master.
+ */
+private[spark]
+class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
+ implicit val timeout = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val host = Utils.localHostName()
+ val port = requestedPort
+
+ val masterActorRef = master.self
+
+ var server: Option[Server] = None
+ var boundPort: Option[Int] = None
+
+ val applicationPage = new ApplicationPage(this)
+ val indexPage = new IndexPage(this)
+
+ def start() {
+ try {
+ val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+ server = Some(srv)
+ boundPort = Some(bPort)
+ logInfo("Started Master web UI at http://%s:%d".format(host, boundPort.get))
+ } catch {
+ case e: Exception =>
+ logError("Failed to create Master JettyUtils", e)
+ System.exit(1)
+ }
+ }
+
+ val metricsHandlers = master.masterMetricsSystem.getServletHandlers ++
+ master.applicationMetricsSystem.getServletHandlers
+
+ val handlers = metricsHandlers ++ Array[(String, Handler)](
+ ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
+ ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
+ ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
+ ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
+ ("*", (request: HttpServletRequest) => indexPage.render(request))
+ )
+
+ def stop() {
+ server.foreach(_.stop())
+ }
+}
+
+private[spark] object MasterWebUI {
+ val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
new file mode 100644
index 0000000..01ce4a6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import java.io._
+import java.lang.System.getenv
+
+import akka.actor.ActorRef
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+
+import org.apache.spark.{Utils, Logging}
+import org.apache.spark.deploy.{ExecutorState, ApplicationDescription}
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+
+/**
+ * Manages the execution of one executor process.
+ */
+private[spark] class ExecutorRunner(
+ val appId: String,
+ val execId: Int,
+ val appDesc: ApplicationDescription,
+ val cores: Int,
+ val memory: Int,
+ val worker: ActorRef,
+ val workerId: String,
+ val host: String,
+ val sparkHome: File,
+ val workDir: File)
+ extends Logging {
+
+ val fullId = appId + "/" + execId
+ var workerThread: Thread = null
+ var process: Process = null
+ var shutdownHook: Thread = null
+
+ private def getAppEnv(key: String): Option[String] =
+ appDesc.command.environment.get(key).orElse(Option(getenv(key)))
+
+ def start() {
+ workerThread = new Thread("ExecutorRunner for " + fullId) {
+ override def run() { fetchAndRunExecutor() }
+ }
+ workerThread.start()
+
+ // Shutdown hook that kills actors on shutdown.
+ shutdownHook = new Thread() {
+ override def run() {
+ if (process != null) {
+ logInfo("Shutdown hook killing child process.")
+ process.destroy()
+ process.waitFor()
+ }
+ }
+ }
+ Runtime.getRuntime.addShutdownHook(shutdownHook)
+ }
+
+ /** Stop this executor runner, including killing the process it launched */
+ def kill() {
+ if (workerThread != null) {
+ workerThread.interrupt()
+ workerThread = null
+ if (process != null) {
+ logInfo("Killing process!")
+ process.destroy()
+ process.waitFor()
+ }
+ worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
+ Runtime.getRuntime.removeShutdownHook(shutdownHook)
+ }
+ }
+
+ /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
+ def substituteVariables(argument: String): String = argument match {
+ case "{{EXECUTOR_ID}}" => execId.toString
+ case "{{HOSTNAME}}" => host
+ case "{{CORES}}" => cores.toString
+ case other => other
+ }
+
+ def buildCommandSeq(): Seq[String] = {
+ val command = appDesc.command
+ val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
+ // SPARK-698: do not call the run.cmd script, as process.destroy()
+ // fails to kill a process tree on Windows
+ Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
+ command.arguments.map(substituteVariables)
+ }
+
+ /**
+ * Attention: this must always be aligned with the environment variables in the run scripts and
+ * the way the JAVA_OPTS are assembled there.
+ */
+ def buildJavaOpts(): Seq[String] = {
+ val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH")
+ .map(p => List("-Djava.library.path=" + p))
+ .getOrElse(Nil)
+ val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+ val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil)
+ val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
+
+ // Figure out our classpath with the external compute-classpath script
+ val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
+ val classPath = Utils.executeAndGetOutput(
+ Seq(sparkHome + "/bin/compute-classpath" + ext),
+ extraEnvironment=appDesc.command.environment)
+
+ Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
+ }
+
+ /** Spawn a thread that will redirect a given stream to a file */
+ def redirectStream(in: InputStream, file: File) {
+ val out = new FileOutputStream(file, true)
+ new Thread("redirect output to " + file) {
+ override def run() {
+ try {
+ Utils.copyStream(in, out, true)
+ } catch {
+ case e: IOException =>
+ logInfo("Redirection to " + file + " closed: " + e.getMessage)
+ }
+ }
+ }.start()
+ }
+
+ /**
+ * Download and run the executor described in our ApplicationDescription
+ */
+ def fetchAndRunExecutor() {
+ try {
+ // Create the executor's working directory
+ val executorDir = new File(workDir, appId + "/" + execId)
+ if (!executorDir.mkdirs()) {
+ throw new IOException("Failed to create directory " + executorDir)
+ }
+
+ // Launch the process
+ val command = buildCommandSeq()
+ logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
+ val builder = new ProcessBuilder(command: _*).directory(executorDir)
+ val env = builder.environment()
+ for ((key, value) <- appDesc.command.environment) {
+ env.put(key, value)
+ }
+ // In case we are running this from within the Spark Shell, avoid creating a "scala"
+ // parent process for the executor command
+ env.put("SPARK_LAUNCH_WITH_SCALA", "0")
+ process = builder.start()
+
+ val header = "Spark Executor Command: %s\n%s\n\n".format(
+ command.mkString("\"", "\" \"", "\""), "=" * 40)
+
+ // Redirect its stdout and stderr to files
+ val stdout = new File(executorDir, "stdout")
+ redirectStream(process.getInputStream, stdout)
+
+ val stderr = new File(executorDir, "stderr")
+ Files.write(header, stderr, Charsets.UTF_8)
+ redirectStream(process.getErrorStream, stderr)
+
+ // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
+ // long-lived processes only. However, in the future, we might restart the executor a few
+ // times on the same machine.
+ val exitCode = process.waitFor()
+ val message = "Command exited with code " + exitCode
+ worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
+ Some(exitCode))
+ } catch {
+ case interrupted: InterruptedException =>
+ logInfo("Runner thread for executor " + fullId + " interrupted")
+
+ case e: Exception => {
+ logError("Error running executor", e)
+ if (process != null) {
+ process.destroy()
+ }
+ val message = e.getClass + ": " + e.getMessage
+ worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
+ }
+ }
+ }
+}