You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/08/26 19:25:41 UTC
[6/7] spark git commit: [SPARK-16967] move mesos to module
http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
deleted file mode 100644
index bb6f6b3..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ /dev/null
@@ -1,745 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.io.File
-import java.util.{Collections, Date, List => JList}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.mesos.{Scheduler, SchedulerDriver}
-import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
-import org.apache.mesos.Protos.Environment.Variable
-import org.apache.mesos.Protos.TaskStatus.Reason
-
-import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
-import org.apache.spark.deploy.mesos.MesosDriverDescription
-import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.Utils
-
-/**
- * Tracks the current state of a Mesos Task that runs a Spark driver.
- * @param driverDescription Submitted driver description from
- * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]]
- * @param taskId Mesos TaskID generated for the task
- * @param slaveId Slave ID that the task is assigned to
- * @param mesosTaskStatus The last known task status update.
- * @param startDate The date the task was launched
- * @param finishDate The date the task finished
- * @param frameworkId Mesos framework ID the task registers with
- */
-private[spark] class MesosClusterSubmissionState(
- val driverDescription: MesosDriverDescription,
- val taskId: TaskID,
- val slaveId: SlaveID,
- var mesosTaskStatus: Option[TaskStatus],
- var startDate: Date,
- var finishDate: Option[Date],
- val frameworkId: String)
- extends Serializable {
-
- def copy(): MesosClusterSubmissionState = {
- new MesosClusterSubmissionState(
- driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId)
- }
-}
-
-/**
- * Tracks the retry state of a driver, which includes the next time it should be scheduled
- * and necessary information to do exponential backoff.
- * This class is not thread-safe, and we expect the caller to handle synchronizing state.
- *
- * @param lastFailureStatus Last Task status when it failed.
- * @param retries Number of times it has been retried.
- * @param nextRetry Time at which it should be retried next
- * @param waitTime The amount of time driver is scheduled to wait until next retry.
- */
-private[spark] class MesosClusterRetryState(
- val lastFailureStatus: TaskStatus,
- val retries: Int,
- val nextRetry: Date,
- val waitTime: Int) extends Serializable {
- def copy(): MesosClusterRetryState =
- new MesosClusterRetryState(lastFailureStatus, retries, nextRetry, waitTime)
-}
-
-/**
- * The full state of the cluster scheduler, currently being used for displaying
- * information on the UI.
- *
- * @param frameworkId Mesos Framework id for the cluster scheduler.
- * @param masterUrl The Mesos master url
- * @param queuedDrivers All drivers queued to be launched
- * @param launchedDrivers All launched or running drivers
- * @param finishedDrivers All terminated drivers
- * @param pendingRetryDrivers All drivers pending to be retried
- */
-private[spark] class MesosClusterSchedulerState(
- val frameworkId: String,
- val masterUrl: Option[String],
- val queuedDrivers: Iterable[MesosDriverDescription],
- val launchedDrivers: Iterable[MesosClusterSubmissionState],
- val finishedDrivers: Iterable[MesosClusterSubmissionState],
- val pendingRetryDrivers: Iterable[MesosDriverDescription])
-
-/**
- * The full state of a Mesos driver, that is being used to display driver information on the UI.
- */
-private[spark] class MesosDriverState(
- val state: String,
- val description: MesosDriverDescription,
- val submissionState: Option[MesosClusterSubmissionState] = None)
-
-/**
- * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
- * as Mesos tasks in a Mesos cluster.
- * All drivers are launched asynchronously by the framework, which will eventually be launched
- * by one of the slaves in the cluster. The results of the driver will be stored in slave's task
- * sandbox which is accessible by visiting the Mesos UI.
- * This scheduler supports recovery by persisting all its state and performs task reconciliation
- * on recover, which gets all the latest state for all the drivers from Mesos master.
- */
-private[spark] class MesosClusterScheduler(
- engineFactory: MesosClusterPersistenceEngineFactory,
- conf: SparkConf)
- extends Scheduler with MesosSchedulerUtils {
- var frameworkUrl: String = _
- private val metricsSystem =
- MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
- private val master = conf.get("spark.master")
- private val appName = conf.get("spark.app.name")
- private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
- private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
- private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
- private val schedulerState = engineFactory.createEngine("scheduler")
- private val stateLock = new Object()
- private val finishedDrivers =
- new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
- private var frameworkId: String = null
- // Holds all the launched drivers and current launch state, keyed by driver id.
- private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]()
- // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation.
- // All drivers that are loaded after failover are added here, as we need get the latest
- // state of the tasks from Mesos.
- private val pendingRecover = new mutable.HashMap[String, SlaveID]()
- // Stores all the submitted drivers that hasn't been launched.
- private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
- // All supervised drivers that are waiting to retry after termination.
- private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
- private val queuedDriversState = engineFactory.createEngine("driverQueue")
- private val launchedDriversState = engineFactory.createEngine("launchedDrivers")
- private val pendingRetryDriversState = engineFactory.createEngine("retryList")
- // Flag to mark if the scheduler is ready to be called, which is until the scheduler
- // is registered with Mesos master.
- @volatile protected var ready = false
- private var masterInfo: Option[MasterInfo] = None
-
- def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
- val c = new CreateSubmissionResponse
- if (!ready) {
- c.success = false
- c.message = "Scheduler is not ready to take requests"
- return c
- }
-
- stateLock.synchronized {
- if (isQueueFull()) {
- c.success = false
- c.message = "Already reached maximum submission size"
- return c
- }
- c.submissionId = desc.submissionId
- queuedDriversState.persist(desc.submissionId, desc)
- queuedDrivers += desc
- c.success = true
- }
- c
- }
-
- def killDriver(submissionId: String): KillSubmissionResponse = {
- val k = new KillSubmissionResponse
- if (!ready) {
- k.success = false
- k.message = "Scheduler is not ready to take requests"
- return k
- }
- k.submissionId = submissionId
- stateLock.synchronized {
- // We look for the requested driver in the following places:
- // 1. Check if submission is running or launched.
- // 2. Check if it's still queued.
- // 3. Check if it's in the retry list.
- // 4. Check if it has already completed.
- if (launchedDrivers.contains(submissionId)) {
- val task = launchedDrivers(submissionId)
- mesosDriver.killTask(task.taskId)
- k.success = true
- k.message = "Killing running driver"
- } else if (removeFromQueuedDrivers(submissionId)) {
- k.success = true
- k.message = "Removed driver while it's still pending"
- } else if (removeFromPendingRetryDrivers(submissionId)) {
- k.success = true
- k.message = "Removed driver while it's being retried"
- } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
- k.success = false
- k.message = "Driver already terminated"
- } else {
- k.success = false
- k.message = "Cannot find driver"
- }
- }
- k
- }
-
- def getDriverStatus(submissionId: String): SubmissionStatusResponse = {
- val s = new SubmissionStatusResponse
- if (!ready) {
- s.success = false
- s.message = "Scheduler is not ready to take requests"
- return s
- }
- s.submissionId = submissionId
- stateLock.synchronized {
- if (queuedDrivers.exists(_.submissionId.equals(submissionId))) {
- s.success = true
- s.driverState = "QUEUED"
- } else if (launchedDrivers.contains(submissionId)) {
- s.success = true
- s.driverState = "RUNNING"
- launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString)
- } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
- s.success = true
- s.driverState = "FINISHED"
- finishedDrivers
- .find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus
- .foreach(state => s.message = state.toString)
- } else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) {
- val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId))
- .get.retryState.get.lastFailureStatus
- s.success = true
- s.driverState = "RETRYING"
- s.message = status.toString
- } else {
- s.success = false
- s.driverState = "NOT_FOUND"
- }
- }
- s
- }
-
- /**
- * Gets the driver state to be displayed on the Web UI.
- */
- def getDriverState(submissionId: String): Option[MesosDriverState] = {
- stateLock.synchronized {
- queuedDrivers.find(_.submissionId.equals(submissionId))
- .map(d => new MesosDriverState("QUEUED", d))
- .orElse(launchedDrivers.get(submissionId)
- .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d))))
- .orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId))
- .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d))))
- .orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId))
- .map(d => new MesosDriverState("RETRYING", d)))
- }
- }
-
- private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity
-
- /**
- * Recover scheduler state that is persisted.
- * We still need to do task reconciliation to be up to date of the latest task states
- * as it might have changed while the scheduler is failing over.
- */
- private def recoverState(): Unit = {
- stateLock.synchronized {
- launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state =>
- launchedDrivers(state.taskId.getValue) = state
- pendingRecover(state.taskId.getValue) = state.slaveId
- }
- queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d)
- // There is potential timing issue where a queued driver might have been launched
- // but the scheduler shuts down before the queued driver was able to be removed
- // from the queue. We try to mitigate this issue by walking through all queued drivers
- // and remove if they're already launched.
- queuedDrivers
- .filter(d => launchedDrivers.contains(d.submissionId))
- .foreach(d => removeFromQueuedDrivers(d.submissionId))
- pendingRetryDriversState.fetchAll[MesosDriverDescription]()
- .foreach(s => pendingRetryDrivers += s)
- // TODO: Consider storing finished drivers so we can show them on the UI after
- // failover. For now we clear the history on each recovery.
- finishedDrivers.clear()
- }
- }
-
- /**
- * Starts the cluster scheduler and wait until the scheduler is registered.
- * This also marks the scheduler to be ready for requests.
- */
- def start(): Unit = {
- // TODO: Implement leader election to make sure only one framework running in the cluster.
- val fwId = schedulerState.fetch[String]("frameworkId")
- fwId.foreach { id =>
- frameworkId = id
- }
- recoverState()
- metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
- metricsSystem.start()
- val driver = createSchedulerDriver(
- master,
- MesosClusterScheduler.this,
- Utils.getCurrentUserName(),
- appName,
- conf,
- Some(frameworkUrl),
- Some(true),
- Some(Integer.MAX_VALUE),
- fwId)
-
- startScheduler(driver)
- ready = true
- }
-
- def stop(): Unit = {
- ready = false
- metricsSystem.report()
- metricsSystem.stop()
- mesosDriver.stop(true)
- }
-
- override def registered(
- driver: SchedulerDriver,
- newFrameworkId: FrameworkID,
- masterInfo: MasterInfo): Unit = {
- logInfo("Registered as framework ID " + newFrameworkId.getValue)
- if (newFrameworkId.getValue != frameworkId) {
- frameworkId = newFrameworkId.getValue
- schedulerState.persist("frameworkId", frameworkId)
- }
- markRegistered()
-
- stateLock.synchronized {
- this.masterInfo = Some(masterInfo)
- if (!pendingRecover.isEmpty) {
- // Start task reconciliation if we need to recover.
- val statuses = pendingRecover.collect {
- case (taskId, slaveId) =>
- val newStatus = TaskStatus.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId).build())
- .setSlaveId(slaveId)
- .setState(MesosTaskState.TASK_STAGING)
- .build()
- launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus))
- .getOrElse(newStatus)
- }
- // TODO: Page the status updates to avoid trying to reconcile
- // a large amount of tasks at once.
- driver.reconcileTasks(statuses.toSeq.asJava)
- }
- }
- }
-
- private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
- desc.conf.getOption("spark.executor.uri")
- .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
- }
-
- private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
- s"${frameworkId}-${desc.submissionId}"
- }
-
- private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
- m.updated(k, f(m.getOrElse(k, default)))
- }
-
- private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
- // TODO(mgummelt): Don't do this here. This should be passed as a --conf
- val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
- v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
- )
-
- val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
-
- val envBuilder = Environment.newBuilder()
- env.foreach { case (k, v) =>
- envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v))
- }
- envBuilder.build()
- }
-
- private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
- val confUris = List(conf.getOption("spark.mesos.uris"),
- desc.conf.getOption("spark.mesos.uris"),
- desc.conf.getOption("spark.submit.pyFiles")).flatMap(
- _.map(_.split(",").map(_.trim))
- ).flatten
-
- val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
-
- ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
- CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
- }
-
- private def getDriverCommandValue(desc: MesosDriverDescription): String = {
- val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
- val executorUri = getDriverExecutorURI(desc)
- // Gets the path to run spark-submit, and the path to the Mesos sandbox.
- val (executable, sandboxPath) = if (dockerDefined) {
- // Application jar is automatically downloaded in the mounted sandbox by Mesos,
- // and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
- ("./bin/spark-submit", "$MESOS_SANDBOX")
- } else if (executorUri.isDefined) {
- val folderBasename = executorUri.get.split('/').last.split('.').head
-
- val entries = conf.getOption("spark.executor.extraLibraryPath")
- .map(path => Seq(path) ++ desc.command.libraryPathEntries)
- .getOrElse(desc.command.libraryPathEntries)
-
- val prefixEnv = if (!entries.isEmpty) Utils.libraryPathEnvPrefix(entries) else ""
-
- val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
- // Sandbox path points to the parent folder as we chdir into the folderBasename.
- (cmdExecutable, "..")
- } else {
- val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
- .orElse(conf.getOption("spark.home"))
- .orElse(Option(System.getenv("SPARK_HOME")))
- .getOrElse {
- throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
- }
- val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath
- // Sandbox points to the current directory by default with Mesos.
- (cmdExecutable, ".")
- }
- val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ")
- val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString()
- val appArguments = desc.command.arguments.mkString(" ")
-
- s"$executable $cmdOptions $primaryResource $appArguments"
- }
-
- private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
- val builder = CommandInfo.newBuilder()
- builder.setValue(getDriverCommandValue(desc))
- builder.setEnvironment(getDriverEnvironment(desc))
- builder.addAllUris(getDriverUris(desc).asJava)
- builder.build()
- }
-
- private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
- var options = Seq(
- "--name", desc.conf.get("spark.app.name"),
- "--master", s"mesos://${conf.get("spark.master")}",
- "--driver-cores", desc.cores.toString,
- "--driver-memory", s"${desc.mem}M")
-
- // Assume empty main class means we're running python
- if (!desc.command.mainClass.equals("")) {
- options ++= Seq("--class", desc.command.mainClass)
- }
-
- desc.conf.getOption("spark.executor.memory").foreach { v =>
- options ++= Seq("--executor-memory", v)
- }
- desc.conf.getOption("spark.cores.max").foreach { v =>
- options ++= Seq("--total-executor-cores", v)
- }
- desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
- val formattedFiles = pyFiles.split(",")
- .map { path => new File(sandboxPath, path.split("/").last).toString() }
- .mkString(",")
- options ++= Seq("--py-files", formattedFiles)
- }
-
- // --conf
- val replicatedOptionsBlacklist = Set(
- "spark.jars", // Avoids duplicate classes in classpath
- "spark.submit.deployMode", // this would be set to `cluster`, but we need client
- "spark.master" // this contains the address of the dispatcher, not master
- )
- val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
- val driverConf = desc.conf.getAll
- .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
- .toMap
- (defaultConf ++ driverConf).foreach { case (key, value) =>
- options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
-
- options
- }
-
- /**
- * Escape args for Unix-like shells, unless already quoted by the user.
- * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
- * and http://www.grymoire.com/Unix/Quote.html
- *
- * @param value argument
- * @return escaped argument
- */
- private[scheduler] def shellEscape(value: String): String = {
- val WrappedInQuotes = """^(".+"|'.+')$""".r
- val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r
- value match {
- case WrappedInQuotes(c) => value // The user quoted his args, don't touch it!
- case ShellSpecialChars(c) => "\"" + value.replaceAll("""(["`\$\\])""", """\\$1""") + "\""
- case _: String => value // Don't touch harmless strings
- }
- }
-
- private class ResourceOffer(
- val offerId: OfferID,
- val slaveId: SlaveID,
- var resources: JList[Resource]) {
- override def toString(): String = {
- s"Offer id: ${offerId}, resources: ${resources}"
- }
- }
-
- private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
- val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
-
- val (remainingResources, cpuResourcesToUse) =
- partitionResources(offer.resources, "cpus", desc.cores)
- val (finalResources, memResourcesToUse) =
- partitionResources(remainingResources.asJava, "mem", desc.mem)
- offer.resources = finalResources.asJava
-
- val appName = desc.conf.get("spark.app.name")
- val taskInfo = TaskInfo.newBuilder()
- .setTaskId(taskId)
- .setName(s"Driver for ${appName}")
- .setSlaveId(offer.slaveId)
- .setCommand(buildDriverCommand(desc))
- .addAllResources(cpuResourcesToUse.asJava)
- .addAllResources(memResourcesToUse.asJava)
-
- desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
- desc.conf,
- taskInfo.getContainerBuilder)
- }
-
- taskInfo.build
- }
-
- /**
- * This method takes all the possible candidates and attempt to schedule them with Mesos offers.
- * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
- * logic on each task.
- */
- private def scheduleTasks(
- candidates: Seq[MesosDriverDescription],
- afterLaunchCallback: (String) => Boolean,
- currentOffers: List[ResourceOffer],
- tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
- for (submission <- candidates) {
- val driverCpu = submission.cores
- val driverMem = submission.mem
- logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
- val offerOption = currentOffers.find { o =>
- getResource(o.resources, "cpus") >= driverCpu &&
- getResource(o.resources, "mem") >= driverMem
- }
- if (offerOption.isEmpty) {
- logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
- s"cpu: $driverCpu, mem: $driverMem")
- } else {
- val offer = offerOption.get
- val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
- val task = createTaskInfo(submission, offer)
- queuedTasks += task
- logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
- submission.submissionId)
- val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
- None, new Date(), None, getDriverFrameworkID(submission))
- launchedDrivers(submission.submissionId) = newState
- launchedDriversState.persist(submission.submissionId, newState)
- afterLaunchCallback(submission.submissionId)
- }
- }
- }
-
- override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
- logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}")
- val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
- val currentTime = new Date()
-
- val currentOffers = offers.asScala.map {
- o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
- }.toList
-
- stateLock.synchronized {
- // We first schedule all the supervised drivers that are ready to retry.
- // This list will be empty if none of the drivers are marked as supervise.
- val driversToRetry = pendingRetryDrivers.filter { d =>
- d.retryState.get.nextRetry.before(currentTime)
- }
-
- scheduleTasks(
- copyBuffer(driversToRetry),
- removeFromPendingRetryDrivers,
- currentOffers,
- tasks)
-
- // Then we walk through the queued drivers and try to schedule them.
- scheduleTasks(
- copyBuffer(queuedDrivers),
- removeFromQueuedDrivers,
- currentOffers,
- tasks)
- }
- tasks.foreach { case (offerId, taskInfos) =>
- driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
- }
-
- for (o <- currentOffers if !tasks.contains(o.offerId)) {
- driver.declineOffer(o.offerId)
- }
- }
-
- private def copyBuffer(
- buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
- val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
- buffer.copyToBuffer(newBuffer)
- newBuffer
- }
-
- def getSchedulerState(): MesosClusterSchedulerState = {
- stateLock.synchronized {
- new MesosClusterSchedulerState(
- frameworkId,
- masterInfo.map(m => s"http://${m.getIp}:${m.getPort}"),
- copyBuffer(queuedDrivers),
- launchedDrivers.values.map(_.copy()).toList,
- finishedDrivers.map(_.copy()).toList,
- copyBuffer(pendingRetryDrivers))
- }
- }
-
- override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {}
- override def disconnected(driver: SchedulerDriver): Unit = {}
- override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = {
- logInfo(s"Framework re-registered with master ${masterInfo.getId}")
- }
- override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
- override def error(driver: SchedulerDriver, error: String): Unit = {
- logError("Error received: " + error)
- markErr()
- }
-
- /**
- * Check if the task state is a recoverable state that we can relaunch the task.
- * Task state like TASK_ERROR are not relaunchable state since it wasn't able
- * to be validated by Mesos.
- */
- private def shouldRelaunch(state: MesosTaskState): Boolean = {
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
- override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
- val taskId = status.getTaskId.getValue
- stateLock.synchronized {
- if (launchedDrivers.contains(taskId)) {
- if (status.getReason == Reason.REASON_RECONCILIATION &&
- !pendingRecover.contains(taskId)) {
- // Task has already received update and no longer requires reconciliation.
- return
- }
- val state = launchedDrivers(taskId)
- // Check if the driver is supervise enabled and can be relaunched.
- if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
- removeFromLaunchedDrivers(taskId)
- state.finishDate = Some(new Date())
- val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
- val (retries, waitTimeSec) = retryState
- .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
- .getOrElse{ (1, 1) }
- val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
-
- val newDriverDescription = state.driverDescription.copy(
- retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
- pendingRetryDrivers += newDriverDescription
- pendingRetryDriversState.persist(taskId, newDriverDescription)
- } else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
- removeFromLaunchedDrivers(taskId)
- state.finishDate = Some(new Date())
- if (finishedDrivers.size >= retainedDrivers) {
- val toRemove = math.max(retainedDrivers / 10, 1)
- finishedDrivers.trimStart(toRemove)
- }
- finishedDrivers += state
- }
- state.mesosTaskStatus = Option(status)
- } else {
- logError(s"Unable to find driver $taskId in status update")
- }
- }
- }
-
- override def frameworkMessage(
- driver: SchedulerDriver,
- executorId: ExecutorID,
- slaveId: SlaveID,
- message: Array[Byte]): Unit = {}
-
- override def executorLost(
- driver: SchedulerDriver,
- executorId: ExecutorID,
- slaveId: SlaveID,
- status: Int): Unit = {}
-
- private def removeFromQueuedDrivers(id: String): Boolean = {
- val index = queuedDrivers.indexWhere(_.submissionId.equals(id))
- if (index != -1) {
- queuedDrivers.remove(index)
- queuedDriversState.expunge(id)
- true
- } else {
- false
- }
- }
-
- private def removeFromLaunchedDrivers(id: String): Boolean = {
- if (launchedDrivers.remove(id).isDefined) {
- launchedDriversState.expunge(id)
- true
- } else {
- false
- }
- }
-
- private def removeFromPendingRetryDrivers(id: String): Boolean = {
- val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id))
- if (index != -1) {
- pendingRetryDrivers.remove(index)
- pendingRetryDriversState.expunge(id)
- true
- } else {
- false
- }
- }
-
- def getQueuedDriversSize: Int = queuedDrivers.size
- def getLaunchedDriversSize: Int = launchedDrivers.size
- def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
deleted file mode 100644
index 1fe9497..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import org.apache.spark.metrics.source.Source
-
-private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterScheduler)
- extends Source {
- override def sourceName: String = "mesos_cluster"
- override def metricRegistry: MetricRegistry = new MetricRegistry()
-
- metricRegistry.register(MetricRegistry.name("waitingDrivers"), new Gauge[Int] {
- override def getValue: Int = scheduler.getQueuedDriversSize
- })
-
- metricRegistry.register(MetricRegistry.name("launchedDrivers"), new Gauge[Int] {
- override def getValue: Int = scheduler.getLaunchedDriversSize
- })
-
- metricRegistry.register(MetricRegistry.name("retryDrivers"), new Gauge[Int] {
- override def getValue: Int = scheduler.getPendingRetryDriversSize
- })
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
deleted file mode 100644
index 6b9313e..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.io.File
-import java.util.{Collections, List => JList}
-import java.util.concurrent.locks.ReentrantLock
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-
-import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
-import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.RpcEndpointAddress
-import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
-
-/**
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
- * latency.
- *
- * Unfortunately this has a bit of duplication from [[MesosFineGrainedSchedulerBackend]],
- * but it seems hard to remove this.
- */
-private[spark] class MesosCoarseGrainedSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext,
- master: String,
- securityManager: SecurityManager)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
- with org.apache.mesos.Scheduler
- with MesosSchedulerUtils {
-
- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
-
- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
-
- private[this] val shutdownTimeoutMS =
- conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
- .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
-
- // Synchronization protected by stateLock
- private[this] var stopCalled: Boolean = false
-
- // If shuffle service is enabled, the Spark driver will register with the shuffle service.
- // This is for cleaning up shuffle files reliably.
- private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
-
- // Cores we have acquired with each Mesos task ID
- val coresByTaskId = new mutable.HashMap[String, Int]
- var totalCoresAcquired = 0
-
- // SlaveID -> Slave
- // This map accumulates entries for the duration of the job. Slaves are never deleted, because
- // we need to maintain e.g. failure state and connection state.
- private val slaves = new mutable.HashMap[String, Slave]
-
- /**
- * The total number of executors we aim to have. Undefined when not using dynamic allocation.
- * Initially set to 0 when using dynamic allocation, the executor allocation manager will send
- * the real initial limit later.
- */
- private var executorLimitOption: Option[Int] = {
- if (Utils.isDynamicAllocationEnabled(conf)) {
- Some(0)
- } else {
- None
- }
- }
-
- /**
- * Return the current executor limit, which may be [[Int.MaxValue]]
- * before properly initialized.
- */
- private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)
-
- // private lock object protecting mutable state above. Using the intrinsic lock
- // may lead to deadlocks since the superclass might also try to lock
- private val stateLock = new ReentrantLock
-
- val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
-
- // Offer constraints
- private val slaveOfferConstraints =
- parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
-
- // Reject offers with mismatched constraints in seconds
- private val rejectOfferDurationForUnmetConstraints =
- getRejectOfferDurationForUnmetConstraints(sc)
-
- // Reject offers when we reached the maximum number of cores for this framework
- private val rejectOfferDurationForReachedMaxCores =
- getRejectOfferDurationForReachedMaxCores(sc)
-
- // A client for talking to the external shuffle service
- private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
- if (shuffleServiceEnabled) {
- Some(getShuffleClient())
- } else {
- None
- }
- }
-
- // This method is factored out for testability
- protected def getShuffleClient(): MesosExternalShuffleClient = {
- new MesosExternalShuffleClient(
- SparkTransportConf.fromSparkConf(conf, "shuffle"),
- securityManager,
- securityManager.isAuthenticationEnabled(),
- securityManager.isSaslEncryptionEnabled())
- }
-
- var nextMesosTaskId = 0
-
- @volatile var appId: String = _
-
- def newMesosTaskId(): String = {
- val id = nextMesosTaskId
- nextMesosTaskId += 1
- id.toString
- }
-
- override def start() {
- super.start()
- val driver = createSchedulerDriver(
- master,
- MesosCoarseGrainedSchedulerBackend.this,
- sc.sparkUser,
- sc.appName,
- sc.conf,
- sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
- None,
- None,
- sc.conf.getOption("spark.mesos.driver.frameworkId")
- )
-
- unsetFrameworkID(sc)
- startScheduler(driver)
- }
-
- def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
- val environment = Environment.newBuilder()
- val extraClassPath = conf.getOption("spark.executor.extraClassPath")
- extraClassPath.foreach { cp =>
- environment.addVariables(
- Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
- }
- val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")
-
- // Set the environment variable through a command prefix
- // to append to the existing value of the variable
- val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p =>
- Utils.libraryPathEnvPrefix(Seq(p))
- }.getOrElse("")
-
- environment.addVariables(
- Environment.Variable.newBuilder()
- .setName("SPARK_EXECUTOR_OPTS")
- .setValue(extraJavaOpts)
- .build())
-
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
-
- val uri = conf.getOption("spark.executor.uri")
- .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
-
- if (uri.isEmpty) {
- val executorSparkHome = conf.getOption("spark.mesos.executor.home")
- .orElse(sc.getSparkHome())
- .getOrElse {
- throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
- }
- val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
- command.setValue(
- "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
- .format(prefixEnv, runScript) +
- s" --driver-url $driverURL" +
- s" --executor-id $taskId" +
- s" --hostname ${offer.getHostname}" +
- s" --cores $numCores" +
- s" --app-id $appId")
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.get.split('/').last.split('.').head
- command.setValue(
- s"cd $basename*; $prefixEnv " +
- "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
- s" --driver-url $driverURL" +
- s" --executor-id $taskId" +
- s" --hostname ${offer.getHostname}" +
- s" --cores $numCores" +
- s" --app-id $appId")
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
- }
-
- conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
-
- command.build()
- }
-
- protected def driverURL: String = {
- if (conf.contains("spark.testing")) {
- "driverURL"
- } else {
- RpcEndpointAddress(
- conf.get("spark.driver.host"),
- conf.get("spark.driver.port").toInt,
- CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
- }
- }
-
- override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
-
- override def registered(
- d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- appId = frameworkId.getValue
- mesosExternalShuffleClient.foreach(_.init(appId))
- markRegistered()
- }
-
- override def sufficientResourcesRegistered(): Boolean = {
- totalCoresAcquired >= maxCores * minRegisteredRatio
- }
-
- override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
-
- override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
-
- /**
- * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
- * unless we've already launched more than we wanted to.
- */
- override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
- stateLock.synchronized {
- if (stopCalled) {
- logDebug("Ignoring offers during shutdown")
- // Driver should simply return a stopped status on race
- // condition between this.stop() and completing here
- offers.asScala.map(_.getId).foreach(d.declineOffer)
- return
- }
-
- logDebug(s"Received ${offers.size} resource offers.")
-
- val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
- val offerAttributes = toAttributeMap(offer.getAttributesList)
- matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
- }
-
- declineUnmatchedOffers(d, unmatchedOffers)
- handleMatchedOffers(d, matchedOffers)
- }
- }
-
- private def declineUnmatchedOffers(
- d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
- offers.foreach { offer =>
- declineOffer(d, offer, Some("unmet constraints"),
- Some(rejectOfferDurationForUnmetConstraints))
- }
- }
-
- private def declineOffer(
- d: org.apache.mesos.SchedulerDriver,
- offer: Offer,
- reason: Option[String] = None,
- refuseSeconds: Option[Long] = None): Unit = {
-
- val id = offer.getId.getValue
- val offerAttributes = toAttributeMap(offer.getAttributesList)
- val mem = getResource(offer.getResourcesList, "mem")
- val cpus = getResource(offer.getResourcesList, "cpus")
- val ports = getRangeResource(offer.getResourcesList, "ports")
-
- logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
- s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
- reason.map(r => s" (reason: $r)").getOrElse(""))
-
- refuseSeconds match {
- case Some(seconds) =>
- val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
- d.declineOffer(offer.getId, filters)
- case _ => d.declineOffer(offer.getId)
- }
- }
-
- /**
- * Launches executors on accepted offers, and declines unused offers. Executors are launched
- * round-robin on offers.
- *
- * @param d SchedulerDriver
- * @param offers Mesos offers that match attribute constraints
- */
- private def handleMatchedOffers(
- d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
- val tasks = buildMesosTasks(offers)
- for (offer <- offers) {
- val offerAttributes = toAttributeMap(offer.getAttributesList)
- val offerMem = getResource(offer.getResourcesList, "mem")
- val offerCpus = getResource(offer.getResourcesList, "cpus")
- val offerPorts = getRangeResource(offer.getResourcesList, "ports")
- val id = offer.getId.getValue
-
- if (tasks.contains(offer.getId)) { // accept
- val offerTasks = tasks(offer.getId)
-
- logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
- s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." +
- s" Launching ${offerTasks.size} Mesos tasks.")
-
- for (task <- offerTasks) {
- val taskId = task.getTaskId
- val mem = getResource(task.getResourcesList, "mem")
- val cpus = getResource(task.getResourcesList, "cpus")
- val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")
-
- logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
- s" ports: $ports")
- }
-
- d.launchTasks(
- Collections.singleton(offer.getId),
- offerTasks.asJava)
- } else if (totalCoresAcquired >= maxCores) {
- // Reject an offer for a configurable amount of time to avoid starving other frameworks
- declineOffer(d, offer, Some("reached spark.cores.max"),
- Some(rejectOfferDurationForReachedMaxCores))
- } else {
- declineOffer(d, offer)
- }
- }
- }
-
- /**
- * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize
- * per-task memory and IO, tasks are round-robin assigned to offers.
- *
- * @param offers Mesos offers that match attribute constraints
- * @return A map from OfferID to a list of Mesos tasks to launch on that offer
- */
- private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
- // offerID -> tasks
- val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
-
- // offerID -> resources
- val remainingResources = mutable.Map(offers.map(offer =>
- (offer.getId.getValue, offer.getResourcesList)): _*)
-
- var launchTasks = true
-
- // TODO(mgummelt): combine offers for a single slave
- //
- // round-robin create executors on the available offers
- while (launchTasks) {
- launchTasks = false
-
- for (offer <- offers) {
- val slaveId = offer.getSlaveId.getValue
- val offerId = offer.getId.getValue
- val resources = remainingResources(offerId)
-
- if (canLaunchTask(slaveId, resources)) {
- // Create a task
- launchTasks = true
- val taskId = newMesosTaskId()
- val offerCPUs = getResource(resources, "cpus").toInt
-
- val taskCPUs = executorCores(offerCPUs)
- val taskMemory = executorMemory(sc)
-
- slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
-
- val (resourcesLeft, resourcesToUse) =
- partitionTaskResources(resources, taskCPUs, taskMemory)
-
- val taskBuilder = MesosTaskInfo.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
- .setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
- .setName("Task " + taskId)
-
- taskBuilder.addAllResources(resourcesToUse.asJava)
-
- sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
- image,
- sc.conf,
- taskBuilder.getContainerBuilder
- )
- }
-
- tasks(offer.getId) ::= taskBuilder.build()
- remainingResources(offerId) = resourcesLeft.asJava
- totalCoresAcquired += taskCPUs
- coresByTaskId(taskId) = taskCPUs
- }
- }
- }
- tasks.toMap
- }
-
- /** Extracts task needed resources from a list of available resources. */
- private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
- : (List[Resource], List[Resource]) = {
-
- // partition cpus & mem
- val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
- val (afterMemResources, memResourcesToUse) =
- partitionResources(afterCPUResources.asJava, "mem", taskMemory)
-
- // If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
- // on the same host. This essentially means one executor per host.
- // TODO: handle network isolator case
- val (nonPortResources, portResourcesToUse) =
- partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)
-
- (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
- }
-
- private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
- val offerMem = getResource(resources, "mem")
- val offerCPUs = getResource(resources, "cpus").toInt
- val cpus = executorCores(offerCPUs)
- val mem = executorMemory(sc)
- val ports = getRangeResource(resources, "ports")
- val meetsPortRequirements = checkPorts(sc.conf, ports)
-
- cpus > 0 &&
- cpus <= offerCPUs &&
- cpus + totalCoresAcquired <= maxCores &&
- mem <= offerMem &&
- numExecutors() < executorLimit &&
- slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
- meetsPortRequirements
- }
-
- private def executorCores(offerCPUs: Int): Int = {
- sc.conf.getInt("spark.executor.cores",
- math.min(offerCPUs, maxCores - totalCoresAcquired))
- }
-
- override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
- val taskId = status.getTaskId.getValue
- val slaveId = status.getSlaveId.getValue
- val state = TaskState.fromMesos(status.getState)
-
- logInfo(s"Mesos task $taskId is now ${status.getState}")
-
- stateLock.synchronized {
- val slave = slaves(slaveId)
-
- // If the shuffle service is enabled, have the driver register with each one of the
- // shuffle services. This allows the shuffle services to clean up state associated with
- // this application when the driver exits. There is currently not a great way to detect
- // this through Mesos, since the shuffle services are set up independently.
- if (state.equals(TaskState.RUNNING) &&
- shuffleServiceEnabled &&
- !slave.shuffleRegistered) {
- assume(mesosExternalShuffleClient.isDefined,
- "External shuffle client was not instantiated even though shuffle service is enabled.")
- // TODO: Remove this and allow the MesosExternalShuffleService to detect
- // framework termination when new Mesos Framework HTTP API is available.
- val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
-
- logDebug(s"Connecting to shuffle service on slave $slaveId, " +
- s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
-
- mesosExternalShuffleClient.get
- .registerDriverWithShuffleService(
- slave.hostname,
- externalShufflePort,
- sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
- s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"),
- sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
- slave.shuffleRegistered = true
- }
-
- if (TaskState.isFinished(state)) {
- // Remove the cores we have remembered for this task, if it's in the hashmap
- for (cores <- coresByTaskId.get(taskId)) {
- totalCoresAcquired -= cores
- coresByTaskId -= taskId
- }
- // If it was a failure, mark the slave as failed for blacklisting purposes
- if (TaskState.isFailed(state)) {
- slave.taskFailures += 1
-
- if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
- logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
- "is Spark installed on it?")
- }
- }
- executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
- // In case we'd rejected everything before but have now lost a node
- d.reviveOffers()
- }
- }
- }
-
- override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
- logError(s"Mesos error: $message")
- scheduler.error(message)
- }
-
- override def stop() {
- // Make sure we're not launching tasks during shutdown
- stateLock.synchronized {
- if (stopCalled) {
- logWarning("Stop called multiple times, ignoring")
- return
- }
- stopCalled = true
- super.stop()
- }
-
- // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them.
- // See SPARK-12330
- val startTime = System.nanoTime()
-
- // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent
- while (numExecutors() > 0 &&
- System.nanoTime() - startTime < shutdownTimeoutMS * 1000L * 1000L) {
- Thread.sleep(100)
- }
-
- if (numExecutors() > 0) {
- logWarning(s"Timed out waiting for ${numExecutors()} remaining executors "
- + s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files "
- + "on the mesos nodes.")
- }
-
- // Close the mesos external shuffle client if used
- mesosExternalShuffleClient.foreach(_.close())
-
- if (mesosDriver != null) {
- mesosDriver.stop()
- }
- }
-
- override def frameworkMessage(
- d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
- /**
- * Called when a slave is lost or a Mesos task finished. Updates local view on
- * what tasks are running. It also notifies the driver that an executor was removed.
- */
- private def executorTerminated(
- d: org.apache.mesos.SchedulerDriver,
- slaveId: String,
- taskId: String,
- reason: String): Unit = {
- stateLock.synchronized {
- // Do not call removeExecutor() after this scheduler backend was stopped because
- // removeExecutor() internally will send a message to the driver endpoint but
- // the driver endpoint is not available now, otherwise an exception will be thrown.
- if (!stopCalled) {
- removeExecutor(taskId, SlaveLost(reason))
- }
- slaves(slaveId).taskIDs.remove(taskId)
- }
- }
-
- override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit = {
- logInfo(s"Mesos slave lost: ${slaveId.getValue}")
- }
-
- override def executorLost(
- d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
- logInfo("Mesos executor lost: %s".format(e.getValue))
- }
-
- override def applicationId(): String =
- Option(appId).getOrElse {
- logWarning("Application ID is not initialized yet.")
- super.applicationId
- }
-
- override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- // We don't truly know if we can fulfill the full amount of executors
- // since at coarse grain it depends on the amount of slaves available.
- logInfo("Capping the total amount of executors to " + requestedTotal)
- executorLimitOption = Some(requestedTotal)
- true
- }
-
- override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- if (mesosDriver == null) {
- logWarning("Asked to kill executors before the Mesos driver was started.")
- false
- } else {
- for (executorId <- executorIds) {
- val taskId = TaskID.newBuilder().setValue(executorId).build()
- mesosDriver.killTask(taskId)
- }
- // no need to adjust `executorLimitOption` since the AllocationManager already communicated
- // the desired limit through a call to `doRequestTotalExecutors`.
- // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
- true
- }
- }
-
- private def numExecutors(): Int = {
- slaves.values.map(_.taskIDs.size).sum
- }
-}
-
-private class Slave(val hostname: String) {
- val taskIDs = new mutable.HashSet[String]()
- var taskFailures = 0
- var shuffleRegistered = false
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
deleted file mode 100644
index f1e48fa..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.io.File
-import java.util.{ArrayList => JArrayList, Collections, List => JList}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{HashMap, HashSet}
-
-import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.protobuf.ByteString
-
-import org.apache.spark.{SparkContext, SparkException, TaskState}
-import org.apache.spark.executor.MesosExecutorBackend
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.util.Utils
-
-/**
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
- */
-private[spark] class MesosFineGrainedSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext,
- master: String)
- extends SchedulerBackend
- with org.apache.mesos.Scheduler
- with MesosSchedulerUtils {
-
- // Stores the slave ids that has launched a Mesos executor.
- val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
- val taskIdToSlaveId = new HashMap[Long, String]
-
- // An ExecutorInfo for our tasks
- var execArgs: Array[Byte] = null
-
- var classLoader: ClassLoader = null
-
- // The listener bus to publish executor added/removed events.
- val listenerBus = sc.listenerBus
-
- private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
-
- // Offer constraints
- private[this] val slaveOfferConstraints =
- parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
-
- // reject offers with mismatched constraints in seconds
- private val rejectOfferDurationForUnmetConstraints =
- getRejectOfferDurationForUnmetConstraints(sc)
-
- @volatile var appId: String = _
-
- override def start() {
- classLoader = Thread.currentThread.getContextClassLoader
- val driver = createSchedulerDriver(
- master,
- MesosFineGrainedSchedulerBackend.this,
- sc.sparkUser,
- sc.appName,
- sc.conf,
- sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
- Option.empty,
- Option.empty,
- sc.conf.getOption("spark.mesos.driver.frameworkId")
- )
-
- unsetFrameworkID(sc)
- startScheduler(driver)
- }
-
- /**
- * Creates a MesosExecutorInfo that is used to launch a Mesos executor.
- * @param availableResources Available resources that is offered by Mesos
- * @param execId The executor id to assign to this new executor.
- * @return A tuple of the new mesos executor info and the remaining available resources.
- */
- def createExecutorInfo(
- availableResources: JList[Resource],
- execId: String): (MesosExecutorInfo, JList[Resource]) = {
- val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
- .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
- .getOrElse {
- throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
- }
- val environment = Environment.newBuilder()
- sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
- environment.addVariables(
- Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
- }
- val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")
-
- val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p =>
- Utils.libraryPathEnvPrefix(Seq(p))
- }.getOrElse("")
-
- environment.addVariables(
- Environment.Variable.newBuilder()
- .setName("SPARK_EXECUTOR_OPTS")
- .setValue(extraJavaOpts)
- .build())
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
- val uri = sc.conf.getOption("spark.executor.uri")
- .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
-
- val executorBackendName = classOf[MesosExecutorBackend].getName
- if (uri.isEmpty) {
- val executorPath = new File(executorSparkHome, "/bin/spark-class").getPath
- command.setValue(s"$prefixEnv $executorPath $executorBackendName")
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.get.split('/').last.split('.').head
- command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
- }
- val builder = MesosExecutorInfo.newBuilder()
- val (resourcesAfterCpu, usedCpuResources) =
- partitionResources(availableResources, "cpus", mesosExecutorCores)
- val (resourcesAfterMem, usedMemResources) =
- partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc))
-
- builder.addAllResources(usedCpuResources.asJava)
- builder.addAllResources(usedMemResources.asJava)
-
- sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
-
- val executorInfo = builder
- .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
- .setCommand(command)
- .setData(ByteString.copyFrom(createExecArg()))
-
- sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
- image,
- sc.conf,
- executorInfo.getContainerBuilder()
- )
- }
-
- (executorInfo.build(), resourcesAfterMem.asJava)
- }
-
- /**
- * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
- * containing all the spark.* system properties in the form of (String, String) pairs.
- */
- private def createExecArg(): Array[Byte] = {
- if (execArgs == null) {
- val props = new HashMap[String, String]
- for ((key, value) <- sc.conf.getAll) {
- props(key) = value
- }
- // Serialize the map as an array of (String, String) pairs
- execArgs = Utils.serialize(props.toArray)
- }
- execArgs
- }
-
- override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
-
- override def registered(
- d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- inClassLoader() {
- appId = frameworkId.getValue
- logInfo("Registered as framework ID " + appId)
- markRegistered()
- }
- }
-
- private def inClassLoader()(fun: => Unit) = {
- val oldClassLoader = Thread.currentThread.getContextClassLoader
- Thread.currentThread.setContextClassLoader(classLoader)
- try {
- fun
- } finally {
- Thread.currentThread.setContextClassLoader(oldClassLoader)
- }
- }
-
- override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
-
- override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
-
- private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
- val builder = new StringBuilder
- tasks.asScala.foreach { t =>
- builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
- .append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
- .append("Task resources: ").append(t.getResourcesList).append("\n")
- .append("Executor resources: ").append(t.getExecutor.getResourcesList)
- .append("---------------------------------------------\n")
- }
- builder.toString()
- }
-
- /**
- * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
- * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
- * tasks are balanced across the cluster.
- */
- override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
- inClassLoader() {
- // Fail first on offers with unmet constraints
- val (offersMatchingConstraints, offersNotMatchingConstraints) =
- offers.asScala.partition { o =>
- val offerAttributes = toAttributeMap(o.getAttributesList)
- val meetsConstraints =
- matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
-
- // add some debug messaging
- if (!meetsConstraints) {
- val id = o.getId.getValue
- logDebug(s"Declining offer: $id with attributes: $offerAttributes")
- }
-
- meetsConstraints
- }
-
- // These offers do not meet constraints. We don't need to see them again.
- // Decline the offer for a long period of time.
- offersNotMatchingConstraints.foreach { o =>
- d.declineOffer(o.getId, Filters.newBuilder()
- .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
- }
-
- // Of the matching constraints, see which ones give us enough memory and cores
- val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
- val mem = getResource(o.getResourcesList, "mem")
- val cpus = getResource(o.getResourcesList, "cpus")
- val slaveId = o.getSlaveId.getValue
- val offerAttributes = toAttributeMap(o.getAttributesList)
-
- // check offers for
- // 1. Memory requirements
- // 2. CPU requirements - need at least 1 for executor, 1 for task
- val meetsMemoryRequirements = mem >= executorMemory(sc)
- val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
- val meetsRequirements =
- (meetsMemoryRequirements && meetsCPURequirements) ||
- (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
- val debugstr = if (meetsRequirements) "Accepting" else "Declining"
- logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
- + s"$offerAttributes mem: $mem cpu: $cpus")
-
- meetsRequirements
- }
-
- // Decline offers we ruled out immediately
- unUsableOffers.foreach(o => d.declineOffer(o.getId))
-
- val workerOffers = usableOffers.map { o =>
- val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
- getResource(o.getResourcesList, "cpus").toInt
- } else {
- // If the Mesos executor has not been started on this slave yet, set aside a few
- // cores for the Mesos executor by offering fewer cores to the Spark executor
- (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt
- }
- new WorkerOffer(
- o.getSlaveId.getValue,
- o.getHostname,
- cpus)
- }
-
- val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
- val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
- val slaveIdToResources = new HashMap[String, JList[Resource]]()
- usableOffers.foreach { o =>
- slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
- }
-
- val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
-
- val slavesIdsOfAcceptedOffers = HashSet[String]()
-
- // Call into the TaskSchedulerImpl
- val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
- acceptedOffers
- .foreach { offer =>
- offer.foreach { taskDesc =>
- val slaveId = taskDesc.executorId
- slavesIdsOfAcceptedOffers += slaveId
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- val (mesosTask, remainingResources) = createMesosTask(
- taskDesc,
- slaveIdToResources(slaveId),
- slaveId)
- mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
- .add(mesosTask)
- slaveIdToResources(slaveId) = remainingResources
- }
- }
-
- // Reply to the offers
- val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
-
- mesosTasks.foreach { case (slaveId, tasks) =>
- slaveIdToWorkerOffer.get(slaveId).foreach(o =>
- listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
- // TODO: Add support for log urls for Mesos
- new ExecutorInfo(o.host, o.cores, Map.empty)))
- )
- logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
- d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
- }
-
- // Decline offers that weren't used
- // NOTE: This logic assumes that we only get a single offer for each host in a given batch
- for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
- d.declineOffer(o.getId)
- }
- }
- }
-
- /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */
- def createMesosTask(
- task: TaskDescription,
- resources: JList[Resource],
- slaveId: String): (MesosTaskInfo, JList[Resource]) = {
- val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
- val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) {
- (slaveIdToExecutorInfo(slaveId), resources)
- } else {
- createExecutorInfo(resources, slaveId)
- }
- slaveIdToExecutorInfo(slaveId) = executorInfo
- val (finalResources, cpuResources) =
- partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
- val taskInfo = MesosTaskInfo.newBuilder()
- .setTaskId(taskId)
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setExecutor(executorInfo)
- .setName(task.name)
- .addAllResources(cpuResources.asJava)
- .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
- .build()
- (taskInfo, finalResources.asJava)
- }
-
- override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
- inClassLoader() {
- val tid = status.getTaskId.getValue.toLong
- val state = TaskState.fromMesos(status.getState)
- synchronized {
- if (TaskState.isFailed(TaskState.fromMesos(status.getState))
- && taskIdToSlaveId.contains(tid)) {
- // We lost the executor on this slave, so remember that it's gone
- removeExecutor(taskIdToSlaveId(tid), "Lost executor")
- }
- if (TaskState.isFinished(state)) {
- taskIdToSlaveId.remove(tid)
- }
- }
- scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
- }
- }
-
- override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
- inClassLoader() {
- logError("Mesos error: " + message)
- markErr()
- scheduler.error(message)
- }
- }
-
- override def stop() {
- if (mesosDriver != null) {
- mesosDriver.stop()
- }
- }
-
- override def reviveOffers() {
- mesosDriver.reviveOffers()
- }
-
- override def frameworkMessage(
- d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
- /**
- * Remove executor associated with slaveId in a thread safe manner.
- */
- private def removeExecutor(slaveId: String, reason: String) = {
- synchronized {
- listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
- slaveIdToExecutorInfo -= slaveId
- }
- }
-
- private def recordSlaveLost(
- d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
- inClassLoader() {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- removeExecutor(slaveId.getValue, reason.toString)
- scheduler.executorLost(slaveId.getValue, reason)
- }
- }
-
- override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID) {
- recordSlaveLost(d, slaveId, SlaveLost())
- }
-
- override def executorLost(
- d: org.apache.mesos.SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
- slaveId.getValue))
- recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true))
- }
-
- override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
- mesosDriver.killTask(
- TaskID.newBuilder()
- .setValue(taskId.toString).build()
- )
- }
-
- // TODO: query Mesos for number of cores
- override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8)
-
- override def applicationId(): String =
- Option(appId).getOrElse {
- logWarning("Application ID is not initialized yet.")
- super.applicationId
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
deleted file mode 100644
index 3fe0674..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
-import org.apache.mesos.Protos.ContainerInfo.DockerInfo
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.internal.Logging
-
-/**
- * A collection of utility functions which can be used by both the
- * MesosSchedulerBackend and the [[MesosFineGrainedSchedulerBackend]].
- */
-private[mesos] object MesosSchedulerBackendUtil extends Logging {
- /**
- * Parse a comma-delimited list of volume specs, each of which
- * takes the form [host-dir:]container-dir[:rw|:ro].
- */
- def parseVolumesSpec(volumes: String): List[Volume] = {
- volumes.split(",").map(_.split(":")).flatMap { spec =>
- val vol: Volume.Builder = Volume
- .newBuilder()
- .setMode(Volume.Mode.RW)
- spec match {
- case Array(container_path) =>
- Some(vol.setContainerPath(container_path))
- case Array(container_path, "rw") =>
- Some(vol.setContainerPath(container_path))
- case Array(container_path, "ro") =>
- Some(vol.setContainerPath(container_path)
- .setMode(Volume.Mode.RO))
- case Array(host_path, container_path) =>
- Some(vol.setContainerPath(container_path)
- .setHostPath(host_path))
- case Array(host_path, container_path, "rw") =>
- Some(vol.setContainerPath(container_path)
- .setHostPath(host_path))
- case Array(host_path, container_path, "ro") =>
- Some(vol.setContainerPath(container_path)
- .setHostPath(host_path)
- .setMode(Volume.Mode.RO))
- case spec =>
- logWarning(s"Unable to parse volume specs: $volumes. "
- + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
- None
- }
- }
- .map { _.build() }
- .toList
- }
-
- /**
- * Parse a comma-delimited list of port mapping specs, each of which
- * takes the form host_port:container_port[:udp|:tcp]
- *
- * Note:
- * the docker form is [ip:]host_port:container_port, but the DockerInfo
- * message has no field for 'ip', and instead has a 'protocol' field.
- * Docker itself only appears to support TCP, so this alternative form
- * anticipates the expansion of the docker form to allow for a protocol
- * and leaves open the chance for mesos to begin to accept an 'ip' field
- */
- def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = {
- portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] =>
- val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping
- .newBuilder()
- .setProtocol("tcp")
- spec match {
- case Array(host_port, container_port) =>
- Some(portmap.setHostPort(host_port.toInt)
- .setContainerPort(container_port.toInt))
- case Array(host_port, container_port, protocol) =>
- Some(portmap.setHostPort(host_port.toInt)
- .setContainerPort(container_port.toInt)
- .setProtocol(protocol))
- case spec =>
- logWarning(s"Unable to parse port mapping specs: $portmaps. "
- + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"")
- None
- }
- }
- .map { _.build() }
- .toList
- }
-
- /**
- * Construct a DockerInfo structure and insert it into a ContainerInfo
- */
- def addDockerInfo(
- container: ContainerInfo.Builder,
- image: String,
- containerizer: String,
- forcePullImage: Boolean = false,
- volumes: Option[List[Volume]] = None,
- portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
-
- containerizer match {
- case "docker" =>
- container.setType(ContainerInfo.Type.DOCKER)
- val docker = ContainerInfo.DockerInfo.newBuilder()
- .setImage(image)
- .setForcePullImage(forcePullImage)
- // TODO (mgummelt): Remove this. Portmaps have no effect,
- // as we don't support bridge networking.
- portmaps.foreach(_.foreach(docker.addPortMappings))
- container.setDocker(docker)
- case "mesos" =>
- container.setType(ContainerInfo.Type.MESOS)
- val imageProto = Image.newBuilder()
- .setType(Image.Type.DOCKER)
- .setDocker(Image.Docker.newBuilder().setName(image))
- .setCached(!forcePullImage)
- container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
- case _ =>
- throw new SparkException(
- "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
- }
-
- volumes.foreach(_.foreach(container.addVolumes))
- }
-
- /**
- * Setup a docker containerizer from MesosDriverDescription scheduler properties
- */
- def setupContainerBuilderDockerInfo(
- imageName: String,
- conf: SparkConf,
- builder: ContainerInfo.Builder): Unit = {
- val forcePullImage = conf
- .getOption("spark.mesos.executor.docker.forcePullImage")
- .exists(_.equals("true"))
- val volumes = conf
- .getOption("spark.mesos.executor.docker.volumes")
- .map(parseVolumesSpec)
- val portmaps = conf
- .getOption("spark.mesos.executor.docker.portmaps")
- .map(parsePortMappingsSpec)
-
- val containerizer = conf.get("spark.mesos.containerizer", "docker")
- addDockerInfo(
- builder,
- imageName,
- containerizer,
- forcePullImage = forcePullImage,
- volumes = volumes,
- portmaps = portmaps)
- logDebug("setupContainerDockerInfo: using docker image: " + imageName)
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org