You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:07 UTC
[23/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
deleted file mode 100644
index 0db13ff..0000000
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-import scala.collection.immutable.List
-
-import spark.Utils
-import spark.deploy.ExecutorState.ExecutorState
-import spark.deploy.master.{WorkerInfo, ApplicationInfo}
-import spark.deploy.worker.ExecutorRunner
-
-
-private[deploy] sealed trait DeployMessage extends Serializable
-
-private[deploy] object DeployMessages {
-
- // Worker to Master
-
- case class RegisterWorker(
- id: String,
- host: String,
- port: Int,
- cores: Int,
- memory: Int,
- webUiPort: Int,
- publicAddress: String)
- extends DeployMessage {
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
- }
-
- case class ExecutorStateChanged(
- appId: String,
- execId: Int,
- state: ExecutorState,
- message: Option[String],
- exitStatus: Option[Int])
- extends DeployMessage
-
- case class Heartbeat(workerId: String) extends DeployMessage
-
- // Master to Worker
-
- case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
-
- case class RegisterWorkerFailed(message: String) extends DeployMessage
-
- case class KillExecutor(appId: String, execId: Int) extends DeployMessage
-
- case class LaunchExecutor(
- appId: String,
- execId: Int,
- appDesc: ApplicationDescription,
- cores: Int,
- memory: Int,
- sparkHome: String)
- extends DeployMessage
-
- // Client to Master
-
- case class RegisterApplication(appDescription: ApplicationDescription)
- extends DeployMessage
-
- // Master to Client
-
- case class RegisteredApplication(appId: String) extends DeployMessage
-
- // TODO(matei): replace hostPort with host
- case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
- Utils.checkHostPort(hostPort, "Required hostport")
- }
-
- case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
- exitStatus: Option[Int])
-
- case class ApplicationRemoved(message: String)
-
- // Internal message in Client
-
- case object StopClient
-
- // MasterWebUI To Master
-
- case object RequestMasterState
-
- // Master to MasterWebUI
-
- case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
- activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
-
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
-
- def uri = "spark://" + host + ":" + port
- }
-
- // WorkerWebUI to Worker
-
- case object RequestWorkerState
-
- // Worker to WorkerWebUI
-
- case class WorkerStateResponse(host: String, port: Int, workerId: String,
- executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
- cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
-
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
- }
-
- // Actor System to Master
-
- case object CheckForWorkerTimeOut
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/ExecutorState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/ExecutorState.scala b/core/src/main/scala/spark/deploy/ExecutorState.scala
deleted file mode 100644
index 08c9a3b..0000000
--- a/core/src/main/scala/spark/deploy/ExecutorState.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-private[spark] object ExecutorState
- extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
-
- val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
-
- type ExecutorState = Value
-
- def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
deleted file mode 100644
index f8dcf02..0000000
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-import net.liftweb.json.JsonDSL._
-
-import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import spark.deploy.master.{ApplicationInfo, WorkerInfo}
-import spark.deploy.worker.ExecutorRunner
-
-
-private[spark] object JsonProtocol {
- def writeWorkerInfo(obj: WorkerInfo) = {
- ("id" -> obj.id) ~
- ("host" -> obj.host) ~
- ("port" -> obj.port) ~
- ("webuiaddress" -> obj.webUiAddress) ~
- ("cores" -> obj.cores) ~
- ("coresused" -> obj.coresUsed) ~
- ("memory" -> obj.memory) ~
- ("memoryused" -> obj.memoryUsed) ~
- ("state" -> obj.state.toString)
- }
-
- def writeApplicationInfo(obj: ApplicationInfo) = {
- ("starttime" -> obj.startTime) ~
- ("id" -> obj.id) ~
- ("name" -> obj.desc.name) ~
- ("cores" -> obj.desc.maxCores) ~
- ("user" -> obj.desc.user) ~
- ("memoryperslave" -> obj.desc.memoryPerSlave) ~
- ("submitdate" -> obj.submitDate.toString)
- }
-
- def writeApplicationDescription(obj: ApplicationDescription) = {
- ("name" -> obj.name) ~
- ("cores" -> obj.maxCores) ~
- ("memoryperslave" -> obj.memoryPerSlave) ~
- ("user" -> obj.user)
- }
-
- def writeExecutorRunner(obj: ExecutorRunner) = {
- ("id" -> obj.execId) ~
- ("memory" -> obj.memory) ~
- ("appid" -> obj.appId) ~
- ("appdesc" -> writeApplicationDescription(obj.appDesc))
- }
-
- def writeMasterState(obj: MasterStateResponse) = {
- ("url" -> ("spark://" + obj.uri)) ~
- ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
- ("cores" -> obj.workers.map(_.cores).sum) ~
- ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
- ("memory" -> obj.workers.map(_.memory).sum) ~
- ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
- ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
- ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
- }
-
- def writeWorkerState(obj: WorkerStateResponse) = {
- ("id" -> obj.workerId) ~
- ("masterurl" -> obj.masterUrl) ~
- ("masterwebuiurl" -> obj.masterWebUiUrl) ~
- ("cores" -> obj.cores) ~
- ("coresused" -> obj.coresUsed) ~
- ("memory" -> obj.memory) ~
- ("memoryused" -> obj.memoryUsed) ~
- ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
- ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
deleted file mode 100644
index 6b8e9f2..0000000
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
-
-import spark.deploy.worker.Worker
-import spark.deploy.master.Master
-import spark.util.AkkaUtils
-import spark.{Logging, Utils}
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Testing class that creates a Spark standalone process in-cluster (that is, running the
- * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
- * by the Workers still run in separate JVMs. This can be used to test distributed operation and
- * fault recovery without spinning up a lot of processes.
- */
-private[spark]
-class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
-
- private val localHostname = Utils.localHostName()
- private val masterActorSystems = ArrayBuffer[ActorSystem]()
- private val workerActorSystems = ArrayBuffer[ActorSystem]()
-
- def start(): String = {
- logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
-
- /* Start the Master */
- val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0)
- masterActorSystems += masterSystem
- val masterUrl = "spark://" + localHostname + ":" + masterPort
-
- /* Start the Workers */
- for (workerNum <- 1 to numWorkers) {
- val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
- memoryPerWorker, masterUrl, null, Some(workerNum))
- workerActorSystems += workerSystem
- }
-
- return masterUrl
- }
-
- def stop() {
- logInfo("Shutting down local Spark cluster.")
- // Stop the workers before the master so they don't get upset that it disconnected
- workerActorSystems.foreach(_.shutdown())
- workerActorSystems.foreach(_.awaitTermination())
-
- masterActorSystems.foreach(_.shutdown())
- masterActorSystems.foreach(_.awaitTermination())
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
deleted file mode 100644
index 882161e..0000000
--- a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-class SparkHadoopUtil {
-
- // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
- def newConfiguration(): Configuration = new Configuration()
-
- // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
- def addCredentials(conf: JobConf) {}
-
- def isYarnMode(): Boolean = { false }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/WebUI.scala b/core/src/main/scala/spark/deploy/WebUI.scala
deleted file mode 100644
index 8ea7792..0000000
--- a/core/src/main/scala/spark/deploy/WebUI.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-/**
- * Utilities used throughout the web UI.
- */
-private[spark] object DeployWebUI {
- val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-
- def formatDate(date: Date): String = DATE_FORMAT.format(date)
-
- def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
-
- def formatDuration(milliseconds: Long): String = {
- val seconds = milliseconds.toDouble / 1000
- if (seconds < 60) {
- return "%.0f s".format(seconds)
- }
- val minutes = seconds / 60
- if (minutes < 10) {
- return "%.1f min".format(minutes)
- } else if (minutes < 60) {
- return "%.0f min".format(minutes)
- }
- val hours = minutes / 60
- return "%.1f h".format(hours)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
deleted file mode 100644
index 9d5ba8a..0000000
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.client
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor._
-import akka.actor.Terminated
-import akka.pattern.ask
-import akka.util.Duration
-import akka.remote.RemoteClientDisconnected
-import akka.remote.RemoteClientLifeCycleEvent
-import akka.remote.RemoteClientShutdown
-import akka.dispatch.Await
-
-import spark.Logging
-import spark.deploy.{ApplicationDescription, ExecutorState}
-import spark.deploy.DeployMessages._
-import spark.deploy.master.Master
-
-
-/**
- * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
- * and a listener for cluster events, and calls back the listener when various events occur.
- */
-private[spark] class Client(
- actorSystem: ActorSystem,
- masterUrl: String,
- appDescription: ApplicationDescription,
- listener: ClientListener)
- extends Logging {
-
- var actor: ActorRef = null
- var appId: String = null
-
- class ClientActor extends Actor with Logging {
- var master: ActorRef = null
- var masterAddress: Address = null
- var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
-
- override def preStart() {
- logInfo("Connecting to master " + masterUrl)
- try {
- master = context.actorFor(Master.toAkkaUrl(masterUrl))
- masterAddress = master.path.address
- master ! RegisterApplication(appDescription)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
- } catch {
- case e: Exception =>
- logError("Failed to connect to master", e)
- markDisconnected()
- context.stop(self)
- }
- }
-
- override def receive = {
- case RegisteredApplication(appId_) =>
- appId = appId_
- listener.connected(appId)
-
- case ApplicationRemoved(message) =>
- logError("Master removed our application: %s; stopping client".format(message))
- markDisconnected()
- context.stop(self)
-
- case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
- val fullId = appId + "/" + id
- logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
- listener.executorAdded(fullId, workerId, hostPort, cores, memory)
-
- case ExecutorUpdated(id, state, message, exitStatus) =>
- val fullId = appId + "/" + id
- val messageText = message.map(s => " (" + s + ")").getOrElse("")
- logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
- if (ExecutorState.isFinished(state)) {
- listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
- }
-
- case Terminated(actor_) if actor_ == master =>
- logError("Connection to master failed; stopping client")
- markDisconnected()
- context.stop(self)
-
- case RemoteClientDisconnected(transport, address) if address == masterAddress =>
- logError("Connection to master failed; stopping client")
- markDisconnected()
- context.stop(self)
-
- case RemoteClientShutdown(transport, address) if address == masterAddress =>
- logError("Connection to master failed; stopping client")
- markDisconnected()
- context.stop(self)
-
- case StopClient =>
- markDisconnected()
- sender ! true
- context.stop(self)
- }
-
- /**
- * Notify the listener that we disconnected, if we hadn't already done so before.
- */
- def markDisconnected() {
- if (!alreadyDisconnected) {
- listener.disconnected()
- alreadyDisconnected = true
- }
- }
- }
-
- def start() {
- // Just launch an actor; it will call back into the listener.
- actor = actorSystem.actorOf(Props(new ClientActor))
- }
-
- def stop() {
- if (actor != null) {
- try {
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
- val future = actor.ask(StopClient)(timeout)
- Await.result(future, timeout)
- } catch {
- case e: TimeoutException =>
- logInfo("Stop request to Master timed out; it may already be shut down.")
- }
- actor = null
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/client/ClientListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala
deleted file mode 100644
index 0640244..0000000
--- a/core/src/main/scala/spark/deploy/client/ClientListener.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.client
-
-/**
- * Callbacks invoked by deploy client when various events happen. There are currently four events:
- * connecting to the cluster, disconnecting, being given an executor, and having an executor
- * removed (either due to failure or due to revocation).
- *
- * Users of this API should *not* block inside the callback methods.
- */
-private[spark] trait ClientListener {
- def connected(appId: String): Unit
-
- def disconnected(): Unit
-
- def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
-
- def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
deleted file mode 100644
index 4f4daa1..0000000
--- a/core/src/main/scala/spark/deploy/client/TestClient.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.client
-
-import spark.util.AkkaUtils
-import spark.{Logging, Utils}
-import spark.deploy.{Command, ApplicationDescription}
-
-private[spark] object TestClient {
-
- class TestListener extends ClientListener with Logging {
- def connected(id: String) {
- logInfo("Connected to master, got app ID " + id)
- }
-
- def disconnected() {
- logInfo("Disconnected from master")
- System.exit(0)
- }
-
- def executorAdded(id: String, workerId: String, hostPort: String, cores: Int, memory: Int) {}
-
- def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
- }
-
- def main(args: Array[String]) {
- val url = args(0)
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
- val desc = new ApplicationDescription(
- "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
- val listener = new TestListener
- val client = new Client(actorSystem, url, desc, listener)
- client.start()
- actorSystem.awaitTermination()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/client/TestExecutor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/client/TestExecutor.scala b/core/src/main/scala/spark/deploy/client/TestExecutor.scala
deleted file mode 100644
index 8a22b6b..0000000
--- a/core/src/main/scala/spark/deploy/client/TestExecutor.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.client
-
-private[spark] object TestExecutor {
- def main(args: Array[String]) {
- println("Hello world!")
- while (true) {
- Thread.sleep(1000)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
deleted file mode 100644
index 6dd2f06..0000000
--- a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import spark.deploy.ApplicationDescription
-import java.util.Date
-import akka.actor.ActorRef
-import scala.collection.mutable
-
-private[spark] class ApplicationInfo(
- val startTime: Long,
- val id: String,
- val desc: ApplicationDescription,
- val submitDate: Date,
- val driver: ActorRef,
- val appUiUrl: String)
-{
- var state = ApplicationState.WAITING
- var executors = new mutable.HashMap[Int, ExecutorInfo]
- var coresGranted = 0
- var endTime = -1L
- val appSource = new ApplicationSource(this)
-
- private var nextExecutorId = 0
-
- def newExecutorId(): Int = {
- val id = nextExecutorId
- nextExecutorId += 1
- id
- }
-
- def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
- val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
- executors(exec.id) = exec
- coresGranted += cores
- exec
- }
-
- def removeExecutor(exec: ExecutorInfo) {
- if (executors.contains(exec.id)) {
- executors -= exec.id
- coresGranted -= exec.cores
- }
- }
-
- def coresLeft: Int = desc.maxCores - coresGranted
-
- private var _retryCount = 0
-
- def retryCount = _retryCount
-
- def incrementRetryCount = {
- _retryCount += 1
- _retryCount
- }
-
- def markFinished(endState: ApplicationState.Value) {
- state = endState
- endTime = System.currentTimeMillis()
- }
-
- def duration: Long = {
- if (endTime != -1) {
- endTime - startTime
- } else {
- System.currentTimeMillis() - startTime
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
deleted file mode 100644
index 4df2b6b..0000000
--- a/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package spark.deploy.master
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import spark.metrics.source.Source
-
-class ApplicationSource(val application: ApplicationInfo) extends Source {
- val metricRegistry = new MetricRegistry()
- val sourceName = "%s.%s.%s".format("application", application.desc.name,
- System.currentTimeMillis())
-
- metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
- override def getValue: String = application.state.toString
- })
-
- metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
- override def getValue: Long = application.duration
- })
-
- metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
- override def getValue: Int = application.coresGranted
- })
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/spark/deploy/master/ApplicationState.scala
deleted file mode 100644
index 94f0ad8..0000000
--- a/core/src/main/scala/spark/deploy/master/ApplicationState.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-private[spark] object ApplicationState
- extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
-
- type ApplicationState = Value
-
- val WAITING, RUNNING, FINISHED, FAILED = Value
-
- val MAX_NUM_RETRY = 10
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
deleted file mode 100644
index 99b60f7..0000000
--- a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import spark.deploy.ExecutorState
-
-private[spark] class ExecutorInfo(
- val id: Int,
- val application: ApplicationInfo,
- val worker: WorkerInfo,
- val cores: Int,
- val memory: Int) {
-
- var state = ExecutorState.LAUNCHING
-
- def fullId: String = application.id + "/" + id
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
deleted file mode 100644
index 04af5e1..0000000
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
-import akka.actor._
-import akka.actor.Terminated
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
-import akka.util.duration._
-
-import spark.{Logging, SparkException, Utils}
-import spark.deploy.{ApplicationDescription, ExecutorState}
-import spark.deploy.DeployMessages._
-import spark.deploy.master.ui.MasterWebUI
-import spark.metrics.MetricsSystem
-import spark.util.AkkaUtils
-
-
-private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
- val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
- val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
- val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
- val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
-
- var nextAppNumber = 0
- val workers = new HashSet[WorkerInfo]
- val idToWorker = new HashMap[String, WorkerInfo]
- val actorToWorker = new HashMap[ActorRef, WorkerInfo]
- val addressToWorker = new HashMap[Address, WorkerInfo]
-
- val apps = new HashSet[ApplicationInfo]
- val idToApp = new HashMap[String, ApplicationInfo]
- val actorToApp = new HashMap[ActorRef, ApplicationInfo]
- val addressToApp = new HashMap[Address, ApplicationInfo]
-
- val waitingApps = new ArrayBuffer[ApplicationInfo]
- val completedApps = new ArrayBuffer[ApplicationInfo]
-
- var firstApp: Option[ApplicationInfo] = None
-
- Utils.checkHost(host, "Expected hostname")
-
- val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
- val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
- val masterSource = new MasterSource(this)
-
- val webUi = new MasterWebUI(this, webUiPort)
-
- val masterPublicAddress = {
- val envVar = System.getenv("SPARK_PUBLIC_DNS")
- if (envVar != null) envVar else host
- }
-
- // As a temporary workaround before better ways of configuring memory, we allow users to set
- // a flag that will perform round-robin scheduling across the nodes (spreading out each app
- // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
-
- override def preStart() {
- logInfo("Starting Spark master at spark://" + host + ":" + port)
- // Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- webUi.start()
- context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
-
- masterMetricsSystem.registerSource(masterSource)
- masterMetricsSystem.start()
- applicationMetricsSystem.start()
- }
-
- override def postStop() {
- webUi.stop()
- masterMetricsSystem.stop()
- applicationMetricsSystem.stop()
- }
-
- override def receive = {
- case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
- logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
- host, workerPort, cores, Utils.megabytesToString(memory)))
- if (idToWorker.contains(id)) {
- sender ! RegisterWorkerFailed("Duplicate worker ID")
- } else {
- addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
- context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
- schedule()
- }
- }
-
- case RegisterApplication(description) => {
- logInfo("Registering app " + description.name)
- val app = addApplication(description, sender)
- logInfo("Registered app " + description.name + " with ID " + app.id)
- waitingApps += app
- context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredApplication(app.id)
- schedule()
- }
-
- case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
- val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
- execOption match {
- case Some(exec) => {
- exec.state = state
- exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
- if (ExecutorState.isFinished(state)) {
- val appInfo = idToApp(appId)
- // Remove this executor from the worker and app
- logInfo("Removing executor " + exec.fullId + " because it is " + state)
- appInfo.removeExecutor(exec)
- exec.worker.removeExecutor(exec)
-
- // Only retry certain number of times so we don't go into an infinite loop.
- if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
- schedule()
- } else {
- logError("Application %s with ID %s failed %d times, removing it".format(
- appInfo.desc.name, appInfo.id, appInfo.retryCount))
- removeApplication(appInfo, ApplicationState.FAILED)
- }
- }
- }
- case None =>
- logWarning("Got status update for unknown executor " + appId + "/" + execId)
- }
- }
-
- case Heartbeat(workerId) => {
- idToWorker.get(workerId) match {
- case Some(workerInfo) =>
- workerInfo.lastHeartbeat = System.currentTimeMillis()
- case None =>
- logWarning("Got heartbeat from unregistered worker " + workerId)
- }
- }
-
- case Terminated(actor) => {
- // The disconnected actor could've been either a worker or an app; remove whichever of
- // those we have an entry for in the corresponding actor hashmap
- actorToWorker.get(actor).foreach(removeWorker)
- actorToApp.get(actor).foreach(finishApplication)
- }
-
- case RemoteClientDisconnected(transport, address) => {
- // The disconnected client could've been either a worker or an app; remove whichever it was
- addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(finishApplication)
- }
-
- case RemoteClientShutdown(transport, address) => {
- // The disconnected client could've been either a worker or an app; remove whichever it was
- addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(finishApplication)
- }
-
- case RequestMasterState => {
- sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
- }
-
- case CheckForWorkerTimeOut => {
- timeOutDeadWorkers()
- }
- }
-
- /**
- * Can an app use the given worker? True if the worker has enough memory and we haven't already
- * launched an executor for the app on it (right now the standalone backend doesn't like having
- * two executors on the same worker).
- */
- def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
- worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
- }
-
- /**
- * Schedule the currently available resources among waiting apps. This method will be called
- * every time a new app joins or resource availability changes.
- */
- def schedule() {
- // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
- // in the queue, then the second app, etc.
- if (spreadOutApps) {
- // Try to spread out each app among all the nodes, until it has all its cores
- for (app <- waitingApps if app.coresLeft > 0) {
- val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
- val numUsable = usableWorkers.length
- val assigned = new Array[Int](numUsable) // Number of cores to give on each node
- var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
- var pos = 0
- while (toAssign > 0) {
- if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
- toAssign -= 1
- assigned(pos) += 1
- }
- pos = (pos + 1) % numUsable
- }
- // Now that we've decided how many cores to give on each node, let's actually give them
- for (pos <- 0 until numUsable) {
- if (assigned(pos) > 0) {
- val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
- launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome)
- app.state = ApplicationState.RUNNING
- }
- }
- }
- } else {
- // Pack each app into as few nodes as possible until we've assigned all its cores
- for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
- for (app <- waitingApps if app.coresLeft > 0) {
- if (canUse(app, worker)) {
- val coresToUse = math.min(worker.coresFree, app.coresLeft)
- if (coresToUse > 0) {
- val exec = app.addExecutor(worker, coresToUse)
- launchExecutor(worker, exec, app.desc.sparkHome)
- app.state = ApplicationState.RUNNING
- }
- }
- }
- }
- }
- }
-
- def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
- logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
- worker.addExecutor(exec)
- worker.actor ! LaunchExecutor(
- exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
- exec.application.driver ! ExecutorAdded(
- exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
- }
-
- def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
- publicAddress: String): WorkerInfo = {
- // There may be one or more refs to dead workers on this same node (w/ different ID's),
- // remove them.
- workers.filter { w =>
- (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)
- }.foreach { w =>
- workers -= w
- }
- val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
- workers += worker
- idToWorker(worker.id) = worker
- actorToWorker(sender) = worker
- addressToWorker(sender.path.address) = worker
- worker
- }
-
- def removeWorker(worker: WorkerInfo) {
- logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
- worker.setState(WorkerState.DEAD)
- idToWorker -= worker.id
- actorToWorker -= worker.actor
- addressToWorker -= worker.actor.path.address
- for (exec <- worker.executors.values) {
- logInfo("Telling app of lost executor: " + exec.id)
- exec.application.driver ! ExecutorUpdated(
- exec.id, ExecutorState.LOST, Some("worker lost"), None)
- exec.application.removeExecutor(exec)
- }
- }
-
- def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
- val now = System.currentTimeMillis()
- val date = new Date(now)
- val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
- applicationMetricsSystem.registerSource(app.appSource)
- apps += app
- idToApp(app.id) = app
- actorToApp(driver) = app
- addressToApp(driver.path.address) = app
- if (firstApp == None) {
- firstApp = Some(app)
- }
- val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
- if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
- logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
- }
- app
- }
-
- def finishApplication(app: ApplicationInfo) {
- removeApplication(app, ApplicationState.FINISHED)
- }
-
- def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
- if (apps.contains(app)) {
- logInfo("Removing app " + app.id)
- apps -= app
- idToApp -= app.id
- actorToApp -= app.driver
- addressToApp -= app.driver.path.address
- if (completedApps.size >= RETAINED_APPLICATIONS) {
- val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
- completedApps.take(toRemove).foreach( a => {
- applicationMetricsSystem.removeSource(a.appSource)
- })
- completedApps.trimStart(toRemove)
- }
- completedApps += app // Remember it in our history
- waitingApps -= app
- for (exec <- app.executors.values) {
- exec.worker.removeExecutor(exec)
- exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
- exec.state = ExecutorState.KILLED
- }
- app.markFinished(state)
- if (state != ApplicationState.FINISHED) {
- app.driver ! ApplicationRemoved(state.toString)
- }
- schedule()
- }
- }
-
- /** Generate a new app ID given a app's submission date */
- def newApplicationId(submitDate: Date): String = {
- val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
- nextAppNumber += 1
- appId
- }
-
- /** Check for, and remove, any timed-out workers */
- def timeOutDeadWorkers() {
- // Copy the workers into an array so we don't modify the hashset while iterating through it
- val currentTime = System.currentTimeMillis()
- val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
- for (worker <- toRemove) {
- if (worker.state != WorkerState.DEAD) {
- logWarning("Removing %s because we got no heartbeat in %d seconds".format(
- worker.id, WORKER_TIMEOUT/1000))
- removeWorker(worker)
- } else {
- if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
- workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
- }
- }
- }
-}
-
-private[spark] object Master {
- private val systemName = "sparkMaster"
- private val actorName = "Master"
- private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
-
- def main(argStrings: Array[String]) {
- val args = new MasterArguments(argStrings)
- val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
- actorSystem.awaitTermination()
- }
-
- /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
- def toAkkaUrl(sparkUrl: String): String = {
- sparkUrl match {
- case sparkUrlRegex(host, port) =>
- "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
- case _ =>
- throw new SparkException("Invalid master URL: " + sparkUrl)
- }
- }
-
- def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
- (actorSystem, boundPort)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
deleted file mode 100644
index 0ae0160..0000000
--- a/core/src/main/scala/spark/deploy/master/MasterArguments.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import spark.util.IntParam
-import spark.Utils
-
-/**
- * Command-line parser for the master.
- */
-private[spark] class MasterArguments(args: Array[String]) {
- var host = Utils.localHostName()
- var port = 7077
- var webUiPort = 8080
-
- // Check for settings in environment variables
- if (System.getenv("SPARK_MASTER_HOST") != null) {
- host = System.getenv("SPARK_MASTER_HOST")
- }
- if (System.getenv("SPARK_MASTER_PORT") != null) {
- port = System.getenv("SPARK_MASTER_PORT").toInt
- }
- if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
- webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
- }
- if (System.getProperty("master.ui.port") != null) {
- webUiPort = System.getProperty("master.ui.port").toInt
- }
-
- parse(args.toList)
-
- def parse(args: List[String]): Unit = args match {
- case ("--ip" | "-i") :: value :: tail =>
- Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
- host = value
- parse(tail)
-
- case ("--host" | "-h") :: value :: tail =>
- Utils.checkHost(value, "Please use hostname " + value)
- host = value
- parse(tail)
-
- case ("--port" | "-p") :: IntParam(value) :: tail =>
- port = value
- parse(tail)
-
- case "--webui-port" :: IntParam(value) :: tail =>
- webUiPort = value
- parse(tail)
-
- case ("--help" | "-h") :: tail =>
- printUsageAndExit(0)
-
- case Nil => {}
-
- case _ =>
- printUsageAndExit(1)
- }
-
- /**
- * Print usage and exit JVM with the given exit code.
- */
- def printUsageAndExit(exitCode: Int) {
- System.err.println(
- "Usage: Master [options]\n" +
- "\n" +
- "Options:\n" +
- " -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
- " -h HOST, --host HOST Hostname to listen on\n" +
- " -p PORT, --port PORT Port to listen on (default: 7077)\n" +
- " --webui-port PORT Port for web UI (default: 8080)")
- System.exit(exitCode)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/MasterSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/MasterSource.scala b/core/src/main/scala/spark/deploy/master/MasterSource.scala
deleted file mode 100644
index b8cfa6a..0000000
--- a/core/src/main/scala/spark/deploy/master/MasterSource.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package spark.deploy.master
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import spark.metrics.source.Source
-
-private[spark] class MasterSource(val master: Master) extends Source {
- val metricRegistry = new MetricRegistry()
- val sourceName = "master"
-
- // Gauge for worker numbers in cluster
- metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
- override def getValue: Int = master.workers.size
- })
-
- // Gauge for application numbers in cluster
- metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
- override def getValue: Int = master.apps.size
- })
-
- // Gauge for waiting application numbers in cluster
- metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
- override def getValue: Int = master.waitingApps.size
- })
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
deleted file mode 100644
index 4135cfe..0000000
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-import akka.actor.ActorRef
-import scala.collection.mutable
-import spark.Utils
-
-private[spark] class WorkerInfo(
- val id: String,
- val host: String,
- val port: Int,
- val cores: Int,
- val memory: Int,
- val actor: ActorRef,
- val webUiPort: Int,
- val publicAddress: String) {
-
- Utils.checkHost(host, "Expected hostname")
- assert (port > 0)
-
- var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
- var state: WorkerState.Value = WorkerState.ALIVE
- var coresUsed = 0
- var memoryUsed = 0
-
- var lastHeartbeat = System.currentTimeMillis()
-
- def coresFree: Int = cores - coresUsed
- def memoryFree: Int = memory - memoryUsed
-
- def hostPort: String = {
- assert (port > 0)
- host + ":" + port
- }
-
- def addExecutor(exec: ExecutorInfo) {
- executors(exec.fullId) = exec
- coresUsed += exec.cores
- memoryUsed += exec.memory
- }
-
- def removeExecutor(exec: ExecutorInfo) {
- if (executors.contains(exec.fullId)) {
- executors -= exec.fullId
- coresUsed -= exec.cores
- memoryUsed -= exec.memory
- }
- }
-
- def hasExecutor(app: ApplicationInfo): Boolean = {
- executors.values.exists(_.application == app)
- }
-
- def webUiAddress : String = {
- "http://" + this.publicAddress + ":" + this.webUiPort
- }
-
- def setState(state: WorkerState.Value) = {
- this.state = state
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/WorkerState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/WorkerState.scala b/core/src/main/scala/spark/deploy/master/WorkerState.scala
deleted file mode 100644
index 3e50b77..0000000
--- a/core/src/main/scala/spark/deploy/master/WorkerState.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master
-
-private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
- type WorkerState = Value
-
- val ALIVE, DEAD, DECOMMISSIONED = Value
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
deleted file mode 100644
index 2ad98f7..0000000
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master.ui
-
-import scala.xml.Node
-
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util.duration._
-
-import javax.servlet.http.HttpServletRequest
-
-import net.liftweb.json.JsonAST.JValue
-
-import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import spark.deploy.JsonProtocol
-import spark.deploy.master.ExecutorInfo
-import spark.ui.UIUtils
-import spark.Utils
-
-private[spark] class ApplicationPage(parent: MasterWebUI) {
- val master = parent.masterActorRef
- implicit val timeout = parent.timeout
-
- /** Executor details for a particular application */
- def renderJson(request: HttpServletRequest): JValue = {
- val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
- val app = state.activeApps.find(_.id == appId).getOrElse({
- state.completedApps.find(_.id == appId).getOrElse(null)
- })
- JsonProtocol.writeApplicationInfo(app)
- }
-
- /** Executor details for a particular application */
- def render(request: HttpServletRequest): Seq[Node] = {
- val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
- val app = state.activeApps.find(_.id == appId).getOrElse({
- state.completedApps.find(_.id == appId).getOrElse(null)
- })
-
- val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
- val executors = app.executors.values.toSeq
- val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
-
- val content =
- <div class="row-fluid">
- <div class="span12">
- <ul class="unstyled">
- <li><strong>ID:</strong> {app.id}</li>
- <li><strong>Name:</strong> {app.desc.name}</li>
- <li><strong>User:</strong> {app.desc.user}</li>
- <li><strong>Cores:</strong>
- {
- if (app.desc.maxCores == Integer.MAX_VALUE) {
- "Unlimited (%s granted)".format(app.coresGranted)
- } else {
- "%s (%s granted, %s left)".format(
- app.desc.maxCores, app.coresGranted, app.coresLeft)
- }
- }
- </li>
- <li>
- <strong>Executor Memory:</strong>
- {Utils.megabytesToString(app.desc.memoryPerSlave)}
- </li>
- <li><strong>Submit Date:</strong> {app.submitDate}</li>
- <li><strong>State:</strong> {app.state}</li>
- <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li>
- </ul>
- </div>
- </div>
-
- <div class="row-fluid"> <!-- Executors -->
- <div class="span12">
- <h4> Executor Summary </h4>
- {executorTable}
- </div>
- </div>;
- UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
- }
-
- def executorRow(executor: ExecutorInfo): Seq[Node] = {
- <tr>
- <td>{executor.id}</td>
- <td>
- <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
- </td>
- <td>{executor.cores}</td>
- <td>{executor.memory}</td>
- <td>{executor.state}</td>
- <td>
- <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
- .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
- <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
- .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
- </td>
- </tr>
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
deleted file mode 100644
index 093e523..0000000
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master.ui
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
-
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util.duration._
-
-import net.liftweb.json.JsonAST.JValue
-
-import spark.Utils
-import spark.deploy.DeployWebUI
-import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import spark.deploy.JsonProtocol
-import spark.deploy.master.{ApplicationInfo, WorkerInfo}
-import spark.ui.UIUtils
-
-private[spark] class IndexPage(parent: MasterWebUI) {
- val master = parent.masterActorRef
- implicit val timeout = parent.timeout
-
- def renderJson(request: HttpServletRequest): JValue = {
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
- JsonProtocol.writeMasterState(state)
- }
-
- /** Index view listing applications and executors */
- def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
-
- val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
- val workers = state.workers.sortBy(_.id)
- val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
-
- val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User",
- "State", "Duration")
- val activeApps = state.activeApps.sortBy(_.startTime).reverse
- val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
- val completedApps = state.completedApps.sortBy(_.endTime).reverse
- val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
-
- val content =
- <div class="row-fluid">
- <div class="span12">
- <ul class="unstyled">
- <li><strong>URL:</strong> {state.uri}</li>
- <li><strong>Workers:</strong> {state.workers.size}</li>
- <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
- {state.workers.map(_.coresUsed).sum} Used</li>
- <li><strong>Memory:</strong>
- {Utils.megabytesToString(state.workers.map(_.memory).sum)} Total,
- {Utils.megabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
- <li><strong>Applications:</strong>
- {state.activeApps.size} Running,
- {state.completedApps.size} Completed </li>
- </ul>
- </div>
- </div>
-
- <div class="row-fluid">
- <div class="span12">
- <h4> Workers </h4>
- {workerTable}
- </div>
- </div>
-
- <div class="row-fluid">
- <div class="span12">
- <h4> Running Applications </h4>
-
- {activeAppsTable}
- </div>
- </div>
-
- <div class="row-fluid">
- <div class="span12">
- <h4> Completed Applications </h4>
- {completedAppsTable}
- </div>
- </div>;
- UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
- }
-
- def workerRow(worker: WorkerInfo): Seq[Node] = {
- <tr>
- <td>
- <a href={worker.webUiAddress}>{worker.id}</a>
- </td>
- <td>{worker.host}:{worker.port}</td>
- <td>{worker.state}</td>
- <td>{worker.cores} ({worker.coresUsed} Used)</td>
- <td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}>
- {Utils.megabytesToString(worker.memory)}
- ({Utils.megabytesToString(worker.memoryUsed)} Used)
- </td>
- </tr>
- }
-
-
- def appRow(app: ApplicationInfo): Seq[Node] = {
- <tr>
- <td>
- <a href={"app?appId=" + app.id}>{app.id}</a>
- </td>
- <td>
- <a href={app.appUiUrl}>{app.desc.name}</a>
- </td>
- <td>
- {app.coresGranted}
- </td>
- <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
- {Utils.megabytesToString(app.desc.memoryPerSlave)}
- </td>
- <td>{DeployWebUI.formatDate(app.submitDate)}</td>
- <td>{app.desc.user}</td>
- <td>{app.state.toString}</td>
- <td>{DeployWebUI.formatDuration(app.duration)}</td>
- </tr>
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
deleted file mode 100644
index c91e1db..0000000
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.master.ui
-
-import akka.util.Duration
-
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.server.{Handler, Server}
-
-import spark.{Logging, Utils}
-import spark.deploy.master.Master
-import spark.ui.JettyUtils
-import spark.ui.JettyUtils._
-
-/**
- * Web UI server for the standalone master.
- */
-private[spark]
-class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
- val host = Utils.localHostName()
- val port = requestedPort
-
- val masterActorRef = master.self
-
- var server: Option[Server] = None
- var boundPort: Option[Int] = None
-
- val applicationPage = new ApplicationPage(this)
- val indexPage = new IndexPage(this)
-
- def start() {
- try {
- val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
- server = Some(srv)
- boundPort = Some(bPort)
- logInfo("Started Master web UI at http://%s:%d".format(host, boundPort.get))
- } catch {
- case e: Exception =>
- logError("Failed to create Master JettyUtils", e)
- System.exit(1)
- }
- }
-
- val metricsHandlers = master.masterMetricsSystem.getServletHandlers ++
- master.applicationMetricsSystem.getServletHandlers
-
- val handlers = metricsHandlers ++ Array[(String, Handler)](
- ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
- ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
- ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
- ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
- ("*", (request: HttpServletRequest) => indexPage.render(request))
- )
-
- def stop() {
- server.foreach(_.stop())
- }
-}
-
-private[spark] object MasterWebUI {
- val STATIC_RESOURCE_DIR = "spark/ui/static"
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
deleted file mode 100644
index 34665ce..0000000
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.worker
-
-import java.io._
-import java.lang.System.getenv
-
-import akka.actor.ActorRef
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-
-import spark.{Utils, Logging}
-import spark.deploy.{ExecutorState, ApplicationDescription}
-import spark.deploy.DeployMessages.ExecutorStateChanged
-
-/**
- * Manages the execution of one executor process.
- */
-private[spark] class ExecutorRunner(
- val appId: String,
- val execId: Int,
- val appDesc: ApplicationDescription,
- val cores: Int,
- val memory: Int,
- val worker: ActorRef,
- val workerId: String,
- val host: String,
- val sparkHome: File,
- val workDir: File)
- extends Logging {
-
- val fullId = appId + "/" + execId
- var workerThread: Thread = null
- var process: Process = null
- var shutdownHook: Thread = null
-
- private def getAppEnv(key: String): Option[String] =
- appDesc.command.environment.get(key).orElse(Option(getenv(key)))
-
- def start() {
- workerThread = new Thread("ExecutorRunner for " + fullId) {
- override def run() { fetchAndRunExecutor() }
- }
- workerThread.start()
-
- // Shutdown hook that kills actors on shutdown.
- shutdownHook = new Thread() {
- override def run() {
- if (process != null) {
- logInfo("Shutdown hook killing child process.")
- process.destroy()
- process.waitFor()
- }
- }
- }
- Runtime.getRuntime.addShutdownHook(shutdownHook)
- }
-
- /** Stop this executor runner, including killing the process it launched */
- def kill() {
- if (workerThread != null) {
- workerThread.interrupt()
- workerThread = null
- if (process != null) {
- logInfo("Killing process!")
- process.destroy()
- process.waitFor()
- }
- worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
- Runtime.getRuntime.removeShutdownHook(shutdownHook)
- }
- }
-
- /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
- def substituteVariables(argument: String): String = argument match {
- case "{{EXECUTOR_ID}}" => execId.toString
- case "{{HOSTNAME}}" => host
- case "{{CORES}}" => cores.toString
- case other => other
- }
-
- def buildCommandSeq(): Seq[String] = {
- val command = appDesc.command
- val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
- // SPARK-698: do not call the run.cmd script, as process.destroy()
- // fails to kill a process tree on Windows
- Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
- command.arguments.map(substituteVariables)
- }
-
- /**
- * Attention: this must always be aligned with the environment variables in the run scripts and
- * the way the JAVA_OPTS are assembled there.
- */
- def buildJavaOpts(): Seq[String] = {
- val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH")
- .map(p => List("-Djava.library.path=" + p))
- .getOrElse(Nil)
- val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
- val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil)
- val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
-
- // Figure out our classpath with the external compute-classpath script
- val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
- val classPath = Utils.executeAndGetOutput(
- Seq(sparkHome + "/bin/compute-classpath" + ext),
- extraEnvironment=appDesc.command.environment)
-
- Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
- }
-
- /** Spawn a thread that will redirect a given stream to a file */
- def redirectStream(in: InputStream, file: File) {
- val out = new FileOutputStream(file, true)
- new Thread("redirect output to " + file) {
- override def run() {
- try {
- Utils.copyStream(in, out, true)
- } catch {
- case e: IOException =>
- logInfo("Redirection to " + file + " closed: " + e.getMessage)
- }
- }
- }.start()
- }
-
- /**
- * Download and run the executor described in our ApplicationDescription
- */
- def fetchAndRunExecutor() {
- try {
- // Create the executor's working directory
- val executorDir = new File(workDir, appId + "/" + execId)
- if (!executorDir.mkdirs()) {
- throw new IOException("Failed to create directory " + executorDir)
- }
-
- // Launch the process
- val command = buildCommandSeq()
- logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
- val builder = new ProcessBuilder(command: _*).directory(executorDir)
- val env = builder.environment()
- for ((key, value) <- appDesc.command.environment) {
- env.put(key, value)
- }
- // In case we are running this from within the Spark Shell, avoid creating a "scala"
- // parent process for the executor command
- env.put("SPARK_LAUNCH_WITH_SCALA", "0")
- process = builder.start()
-
- val header = "Spark Executor Command: %s\n%s\n\n".format(
- command.mkString("\"", "\" \"", "\""), "=" * 40)
-
- // Redirect its stdout and stderr to files
- val stdout = new File(executorDir, "stdout")
- redirectStream(process.getInputStream, stdout)
-
- val stderr = new File(executorDir, "stderr")
- Files.write(header, stderr, Charsets.UTF_8)
- redirectStream(process.getErrorStream, stderr)
-
- // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
- // long-lived processes only. However, in the future, we might restart the executor a few
- // times on the same machine.
- val exitCode = process.waitFor()
- val message = "Command exited with code " + exitCode
- worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
- Some(exitCode))
- } catch {
- case interrupted: InterruptedException =>
- logInfo("Runner thread for executor " + fullId + " interrupted")
-
- case e: Exception => {
- logError("Error running executor", e)
- if (process != null) {
- process.destroy()
- }
- val message = e.getClass + ": " + e.getMessage
- worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
- }
- }
- }
-}