You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/08/26 19:25:41 UTC

[6/7] spark git commit: [SPARK-16967] move mesos to module
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
deleted file mode 100644
index bb6f6b3..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ /dev/null
@@ -1,745 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.scheduler.cluster.mesos
-import java.util.{Collections, Date, List => JList}
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import org.apache.mesos.{Scheduler, SchedulerDriver}
-import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
-import org.apache.mesos.Protos.Environment.Variable
-import org.apache.mesos.Protos.TaskStatus.Reason
-import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
-import org.apache.spark.deploy.mesos.MesosDriverDescription
-import{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.Utils
- * Tracks the current state of a Mesos Task that runs a Spark driver.
- * @param driverDescription Submitted driver description from
- * [[]]
- * @param taskId Mesos TaskID generated for the task
- * @param slaveId Slave ID that the task is assigned to
- * @param mesosTaskStatus The last known task status update.
- * @param startDate The date the task was launched
- * @param finishDate The date the task finished
- * @param frameworkId Mesos framework ID the task registers with
- */
-private[spark] class MesosClusterSubmissionState(
-    val driverDescription: MesosDriverDescription,
-    val taskId: TaskID,
-    val slaveId: SlaveID,
-    var mesosTaskStatus: Option[TaskStatus],
-    var startDate: Date,
-    var finishDate: Option[Date],
-    val frameworkId: String)
-  extends Serializable {
-  def copy(): MesosClusterSubmissionState = {
-    new MesosClusterSubmissionState(
-      driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId)
-  }
- * Tracks the retry state of a driver, which includes the next time it should be scheduled
- * and necessary information to do exponential backoff.
- * This class is not thread-safe, and we expect the caller to handle synchronizing state.
- *
- * @param lastFailureStatus Last Task status when it failed.
- * @param retries Number of times it has been retried.
- * @param nextRetry Time at which it should be retried next
- * @param waitTime The amount of time driver is scheduled to wait until next retry.
- */
-private[spark] class MesosClusterRetryState(
-    val lastFailureStatus: TaskStatus,
-    val retries: Int,
-    val nextRetry: Date,
-    val waitTime: Int) extends Serializable {
-  def copy(): MesosClusterRetryState =
-    new MesosClusterRetryState(lastFailureStatus, retries, nextRetry, waitTime)
- * The full state of the cluster scheduler, currently being used for displaying
- * information on the UI.
- *
- * @param frameworkId Mesos Framework id for the cluster scheduler.
- * @param masterUrl The Mesos master url
- * @param queuedDrivers All drivers queued to be launched
- * @param launchedDrivers All launched or running drivers
- * @param finishedDrivers All terminated drivers
- * @param pendingRetryDrivers All drivers pending to be retried
- */
-private[spark] class MesosClusterSchedulerState(
-    val frameworkId: String,
-    val masterUrl: Option[String],
-    val queuedDrivers: Iterable[MesosDriverDescription],
-    val launchedDrivers: Iterable[MesosClusterSubmissionState],
-    val finishedDrivers: Iterable[MesosClusterSubmissionState],
-    val pendingRetryDrivers: Iterable[MesosDriverDescription])
- * The full state of a Mesos driver, that is being used to display driver information on the UI.
- */
-private[spark] class MesosDriverState(
-    val state: String,
-    val description: MesosDriverDescription,
-    val submissionState: Option[MesosClusterSubmissionState] = None)
- * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
- * as Mesos tasks in a Mesos cluster.
- * All drivers are launched asynchronously by the framework, which will eventually be launched
- * by one of the slaves in the cluster. The results of the driver will be stored in slave's task
- * sandbox which is accessible by visiting the Mesos UI.
- * This scheduler supports recovery by persisting all its state and performs task reconciliation
- * on recover, which gets all the latest state for all the drivers from Mesos master.
- */
-private[spark] class MesosClusterScheduler(
-    engineFactory: MesosClusterPersistenceEngineFactory,
-    conf: SparkConf)
-  extends Scheduler with MesosSchedulerUtils {
-  var frameworkUrl: String = _
-  private val metricsSystem =
-    MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
-  private val master = conf.get("spark.master")
-  private val appName = conf.get("")
-  private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
-  private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
-  private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
-  private val schedulerState = engineFactory.createEngine("scheduler")
-  private val stateLock = new Object()
-  private val finishedDrivers =
-    new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
-  private var frameworkId: String = null
-  // Holds all the launched drivers and current launch state, keyed by driver id.
-  private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]()
-  // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation.
-  // All drivers that are loaded after failover are added here, as we need get the latest
-  // state of the tasks from Mesos.
-  private val pendingRecover = new mutable.HashMap[String, SlaveID]()
-  // Stores all the submitted drivers that hasn't been launched.
-  private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
-  // All supervised drivers that are waiting to retry after termination.
-  private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
-  private val queuedDriversState = engineFactory.createEngine("driverQueue")
-  private val launchedDriversState = engineFactory.createEngine("launchedDrivers")
-  private val pendingRetryDriversState = engineFactory.createEngine("retryList")
-  // Flag to mark if the scheduler is ready to be called, which is until the scheduler
-  // is registered with Mesos master.
-  @volatile protected var ready = false
-  private var masterInfo: Option[MasterInfo] = None
-  def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
-    val c = new CreateSubmissionResponse
-    if (!ready) {
-      c.success = false
-      c.message = "Scheduler is not ready to take requests"
-      return c
-    }
-    stateLock.synchronized {
-      if (isQueueFull()) {
-        c.success = false
-        c.message = "Already reached maximum submission size"
-        return c
-      }
-      c.submissionId = desc.submissionId
-      queuedDriversState.persist(desc.submissionId, desc)
-      queuedDrivers += desc
-      c.success = true
-    }
-    c
-  }
-  def killDriver(submissionId: String): KillSubmissionResponse = {
-    val k = new KillSubmissionResponse
-    if (!ready) {
-      k.success = false
-      k.message = "Scheduler is not ready to take requests"
-      return k
-    }
-    k.submissionId = submissionId
-    stateLock.synchronized {
-      // We look for the requested driver in the following places:
-      // 1. Check if submission is running or launched.
-      // 2. Check if it's still queued.
-      // 3. Check if it's in the retry list.
-      // 4. Check if it has already completed.
-      if (launchedDrivers.contains(submissionId)) {
-        val task = launchedDrivers(submissionId)
-        mesosDriver.killTask(task.taskId)
-        k.success = true
-        k.message = "Killing running driver"
-      } else if (removeFromQueuedDrivers(submissionId)) {
-        k.success = true
-        k.message = "Removed driver while it's still pending"
-      } else if (removeFromPendingRetryDrivers(submissionId)) {
-        k.success = true
-        k.message = "Removed driver while it's being retried"
-      } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
-        k.success = false
-        k.message = "Driver already terminated"
-      } else {
-        k.success = false
-        k.message = "Cannot find driver"
-      }
-    }
-    k
-  }
-  def getDriverStatus(submissionId: String): SubmissionStatusResponse = {
-    val s = new SubmissionStatusResponse
-    if (!ready) {
-      s.success = false
-      s.message = "Scheduler is not ready to take requests"
-      return s
-    }
-    s.submissionId = submissionId
-    stateLock.synchronized {
-      if (queuedDrivers.exists(_.submissionId.equals(submissionId))) {
-        s.success = true
-        s.driverState = "QUEUED"
-      } else if (launchedDrivers.contains(submissionId)) {
-        s.success = true
-        s.driverState = "RUNNING"
-        launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString)
-      } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
-        s.success = true
-        s.driverState = "FINISHED"
-        finishedDrivers
-          .find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus
-          .foreach(state => s.message = state.toString)
-      } else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) {
-        val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId))
-          .get.retryState.get.lastFailureStatus
-        s.success = true
-        s.driverState = "RETRYING"
-        s.message = status.toString
-      } else {
-        s.success = false
-        s.driverState = "NOT_FOUND"
-      }
-    }
-    s
-  }
-  /**
-   * Gets the driver state to be displayed on the Web UI.
-   */
-  def getDriverState(submissionId: String): Option[MesosDriverState] = {
-    stateLock.synchronized {
-      queuedDrivers.find(_.submissionId.equals(submissionId))
-        .map(d => new MesosDriverState("QUEUED", d))
-        .orElse(launchedDrivers.get(submissionId)
-          .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d))))
-        .orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId))
-          .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d))))
-        .orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId))
-          .map(d => new MesosDriverState("RETRYING", d)))
-    }
-  }
-  private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity
-  /**
-   * Recover scheduler state that is persisted.
-   * We still need to do task reconciliation to be up to date of the latest task states
-   * as it might have changed while the scheduler is failing over.
-   */
-  private def recoverState(): Unit = {
-    stateLock.synchronized {
-      launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state =>
-        launchedDrivers(state.taskId.getValue) = state
-        pendingRecover(state.taskId.getValue) = state.slaveId
-      }
-      queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d)
-      // There is potential timing issue where a queued driver might have been launched
-      // but the scheduler shuts down before the queued driver was able to be removed
-      // from the queue. We try to mitigate this issue by walking through all queued drivers
-      // and remove if they're already launched.
-      queuedDrivers
-        .filter(d => launchedDrivers.contains(d.submissionId))
-        .foreach(d => removeFromQueuedDrivers(d.submissionId))
-      pendingRetryDriversState.fetchAll[MesosDriverDescription]()
-        .foreach(s => pendingRetryDrivers += s)
-      // TODO: Consider storing finished drivers so we can show them on the UI after
-      // failover. For now we clear the history on each recovery.
-      finishedDrivers.clear()
-    }
-  }
-  /**
-   * Starts the cluster scheduler and wait until the scheduler is registered.
-   * This also marks the scheduler to be ready for requests.
-   */
-  def start(): Unit = {
-    // TODO: Implement leader election to make sure only one framework running in the cluster.
-    val fwId = schedulerState.fetch[String]("frameworkId")
-    fwId.foreach { id =>
-      frameworkId = id
-    }
-    recoverState()
-    metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
-    metricsSystem.start()
-    val driver = createSchedulerDriver(
-      master,
-      MesosClusterScheduler.this,
-      Utils.getCurrentUserName(),
-      appName,
-      conf,
-      Some(frameworkUrl),
-      Some(true),
-      Some(Integer.MAX_VALUE),
-      fwId)
-    startScheduler(driver)
-    ready = true
-  }
-  def stop(): Unit = {
-    ready = false
-    metricsSystem.stop()
-    mesosDriver.stop(true)
-  }
-  override def registered(
-      driver: SchedulerDriver,
-      newFrameworkId: FrameworkID,
-      masterInfo: MasterInfo): Unit = {
-    logInfo("Registered as framework ID " + newFrameworkId.getValue)
-    if (newFrameworkId.getValue != frameworkId) {
-      frameworkId = newFrameworkId.getValue
-      schedulerState.persist("frameworkId", frameworkId)
-    }
-    markRegistered()
-    stateLock.synchronized {
-      this.masterInfo = Some(masterInfo)
-      if (!pendingRecover.isEmpty) {
-        // Start task reconciliation if we need to recover.
-        val statuses = pendingRecover.collect {
-          case (taskId, slaveId) =>
-            val newStatus = TaskStatus.newBuilder()
-              .setTaskId(TaskID.newBuilder().setValue(taskId).build())
-              .setSlaveId(slaveId)
-              .setState(MesosTaskState.TASK_STAGING)
-              .build()
-            launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus))
-              .getOrElse(newStatus)
-        }
-        // TODO: Page the status updates to avoid trying to reconcile
-        // a large amount of tasks at once.
-        driver.reconcileTasks(statuses.toSeq.asJava)
-      }
-    }
-  }
-  private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
-    desc.conf.getOption("spark.executor.uri")
-      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
-  }
-  private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
-    s"${frameworkId}-${desc.submissionId}"
-  }
-  private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
-    m.updated(k, f(m.getOrElse(k, default)))
-  }
-  private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
-    // TODO(mgummelt): Don't do this here.  This should be passed as a --conf
-    val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
-      v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
-    )
-    val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
-    val envBuilder = Environment.newBuilder()
-    env.foreach { case (k, v) =>
-      envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v))
-    }
-  }
-  private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
-    val confUris = List(conf.getOption("spark.mesos.uris"),
-      desc.conf.getOption("spark.mesos.uris"),
-      desc.conf.getOption("spark.submit.pyFiles")).flatMap(
-    ).flatten
-    val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
-    ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
-      CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
-  }
-  private def getDriverCommandValue(desc: MesosDriverDescription): String = {
-    val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
-    val executorUri = getDriverExecutorURI(desc)
-    // Gets the path to run spark-submit, and the path to the Mesos sandbox.
-    val (executable, sandboxPath) = if (dockerDefined) {
-      // Application jar is automatically downloaded in the mounted sandbox by Mesos,
-      // and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
-      ("./bin/spark-submit", "$MESOS_SANDBOX")
-    } else if (executorUri.isDefined) {
-      val folderBasename = executorUri.get.split('/').last.split('.').head
-      val entries = conf.getOption("spark.executor.extraLibraryPath")
-        .map(path => Seq(path) ++ desc.command.libraryPathEntries)
-        .getOrElse(desc.command.libraryPathEntries)
-      val prefixEnv = if (!entries.isEmpty) Utils.libraryPathEnvPrefix(entries) else ""
-      val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
-      // Sandbox path points to the parent folder as we chdir into the folderBasename.
-      (cmdExecutable, "..")
-    } else {
-      val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
-        .orElse(conf.getOption("spark.home"))
-        .orElse(Option(System.getenv("SPARK_HOME")))
-        .getOrElse {
-          throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
-        }
-      val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath
-      // Sandbox points to the current directory by default with Mesos.
-      (cmdExecutable, ".")
-    }
-    val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ")
-    val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString()
-    val appArguments = desc.command.arguments.mkString(" ")
-    s"$executable $cmdOptions $primaryResource $appArguments"
-  }
-  private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
-    val builder = CommandInfo.newBuilder()
-    builder.setValue(getDriverCommandValue(desc))
-    builder.setEnvironment(getDriverEnvironment(desc))
-    builder.addAllUris(getDriverUris(desc).asJava)
-  }
-  private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
-    var options = Seq(
-      "--name", desc.conf.get(""),
-      "--master", s"mesos://${conf.get("spark.master")}",
-      "--driver-cores", desc.cores.toString,
-      "--driver-memory", s"${desc.mem}M")
-    // Assume empty main class means we're running python
-    if (!desc.command.mainClass.equals("")) {
-      options ++= Seq("--class", desc.command.mainClass)
-    }
-    desc.conf.getOption("spark.executor.memory").foreach { v =>
-      options ++= Seq("--executor-memory", v)
-    }
-    desc.conf.getOption("spark.cores.max").foreach { v =>
-      options ++= Seq("--total-executor-cores", v)
-    }
-    desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
-      val formattedFiles = pyFiles.split(",")
-        .map { path => new File(sandboxPath, path.split("/").last).toString() }
-        .mkString(",")
-      options ++= Seq("--py-files", formattedFiles)
-    }
-    // --conf
-    val replicatedOptionsBlacklist = Set(
-      "spark.jars", // Avoids duplicate classes in classpath
-      "spark.submit.deployMode", // this would be set to `cluster`, but we need client
-      "spark.master" // this contains the address of the dispatcher, not master
-    )
-    val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
-    val driverConf = desc.conf.getAll
-      .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
-      .toMap
-    (defaultConf ++ driverConf).foreach { case (key, value) =>
-      options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
-    options
-  }
-  /**
-   * Escape args for Unix-like shells, unless already quoted by the user.
-   * Based on:
-   * and
- *
-   * @param value argument
-   * @return escaped argument
-   */
-  private[scheduler] def shellEscape(value: String): String = {
-    val WrappedInQuotes = """^(".+"|'.+')$""".r
-    val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r
-    value match {
-      case WrappedInQuotes(c) => value // The user quoted his args, don't touch it!
-      case ShellSpecialChars(c) => "\"" + value.replaceAll("""(["`\$\\])""", """\\$1""") + "\""
-      case _: String => value // Don't touch harmless strings
-    }
-  }
-  private class ResourceOffer(
-      val offerId: OfferID,
-      val slaveId: SlaveID,
-      var resources: JList[Resource]) {
-    override def toString(): String = {
-      s"Offer id: ${offerId}, resources: ${resources}"
-    }
-  }
-  private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
-    val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
-    val (remainingResources, cpuResourcesToUse) =
-      partitionResources(offer.resources, "cpus", desc.cores)
-    val (finalResources, memResourcesToUse) =
-      partitionResources(remainingResources.asJava, "mem", desc.mem)
-    offer.resources = finalResources.asJava
-    val appName = desc.conf.get("")
-    val taskInfo = TaskInfo.newBuilder()
-      .setTaskId(taskId)
-      .setName(s"Driver for ${appName}")
-      .setSlaveId(offer.slaveId)
-      .setCommand(buildDriverCommand(desc))
-      .addAllResources(cpuResourcesToUse.asJava)
-      .addAllResources(memResourcesToUse.asJava)
-    desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
-        desc.conf,
-        taskInfo.getContainerBuilder)
-    }
-  }
-  /**
-   * This method takes all the possible candidates and attempt to schedule them with Mesos offers.
-   * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
-   * logic on each task.
-   */
-  private def scheduleTasks(
-      candidates: Seq[MesosDriverDescription],
-      afterLaunchCallback: (String) => Boolean,
-      currentOffers: List[ResourceOffer],
-      tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
-    for (submission <- candidates) {
-      val driverCpu = submission.cores
-      val driverMem = submission.mem
-      logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
-      val offerOption = currentOffers.find { o =>
-        getResource(o.resources, "cpus") >= driverCpu &&
-        getResource(o.resources, "mem") >= driverMem
-      }
-      if (offerOption.isEmpty) {
-        logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
-          s"cpu: $driverCpu, mem: $driverMem")
-      } else {
-        val offer = offerOption.get
-        val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
-        val task = createTaskInfo(submission, offer)
-        queuedTasks += task
-        logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
-          submission.submissionId)
-        val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
-          None, new Date(), None, getDriverFrameworkID(submission))
-        launchedDrivers(submission.submissionId) = newState
-        launchedDriversState.persist(submission.submissionId, newState)
-        afterLaunchCallback(submission.submissionId)
-      }
-    }
-  }
-  override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
-    logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}")
-    val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
-    val currentTime = new Date()
-    val currentOffers = {
-      o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
-    }.toList
-    stateLock.synchronized {
-      // We first schedule all the supervised drivers that are ready to retry.
-      // This list will be empty if none of the drivers are marked as supervise.
-      val driversToRetry = pendingRetryDrivers.filter { d =>
-        d.retryState.get.nextRetry.before(currentTime)
-      }
-      scheduleTasks(
-        copyBuffer(driversToRetry),
-        removeFromPendingRetryDrivers,
-        currentOffers,
-        tasks)
-      // Then we walk through the queued drivers and try to schedule them.
-      scheduleTasks(
-        copyBuffer(queuedDrivers),
-        removeFromQueuedDrivers,
-        currentOffers,
-        tasks)
-    }
-    tasks.foreach { case (offerId, taskInfos) =>
-      driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
-    }
-    for (o <- currentOffers if !tasks.contains(o.offerId)) {
-      driver.declineOffer(o.offerId)
-    }
-  }
-  private def copyBuffer(
-      buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
-    val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
-    buffer.copyToBuffer(newBuffer)
-    newBuffer
-  }
-  def getSchedulerState(): MesosClusterSchedulerState = {
-    stateLock.synchronized {
-      new MesosClusterSchedulerState(
-        frameworkId,
- => s"http://${m.getIp}:${m.getPort}"),
-        copyBuffer(queuedDrivers),
-        copyBuffer(pendingRetryDrivers))
-    }
-  }
-  override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {}
-  override def disconnected(driver: SchedulerDriver): Unit = {}
-  override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = {
-    logInfo(s"Framework re-registered with master ${masterInfo.getId}")
-  }
-  override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
-  override def error(driver: SchedulerDriver, error: String): Unit = {
-    logError("Error received: " + error)
-    markErr()
-  }
-  /**
-   * Check if the task state is a recoverable state that we can relaunch the task.
-   * Task state like TASK_ERROR are not relaunchable state since it wasn't able
-   * to be validated by Mesos.
-   */
-  private def shouldRelaunch(state: MesosTaskState): Boolean = {
-    state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
-      state == MesosTaskState.TASK_LOST
-  }
-  override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
-    val taskId = status.getTaskId.getValue
-    stateLock.synchronized {
-      if (launchedDrivers.contains(taskId)) {
-        if (status.getReason == Reason.REASON_RECONCILIATION &&
-          !pendingRecover.contains(taskId)) {
-          // Task has already received update and no longer requires reconciliation.
-          return
-        }
-        val state = launchedDrivers(taskId)
-        // Check if the driver is supervise enabled and can be relaunched.
-        if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
-          removeFromLaunchedDrivers(taskId)
-          state.finishDate = Some(new Date())
-          val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
-          val (retries, waitTimeSec) = retryState
-            .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
-            .getOrElse{ (1, 1) }
-          val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
-          val newDriverDescription = state.driverDescription.copy(
-            retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
-          pendingRetryDrivers += newDriverDescription
-          pendingRetryDriversState.persist(taskId, newDriverDescription)
-        } else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
-          removeFromLaunchedDrivers(taskId)
-          state.finishDate = Some(new Date())
-          if (finishedDrivers.size >= retainedDrivers) {
-            val toRemove = math.max(retainedDrivers / 10, 1)
-            finishedDrivers.trimStart(toRemove)
-          }
-          finishedDrivers += state
-        }
-        state.mesosTaskStatus = Option(status)
-      } else {
-        logError(s"Unable to find driver $taskId in status update")
-      }
-    }
-  }
-  override def frameworkMessage(
-      driver: SchedulerDriver,
-      executorId: ExecutorID,
-      slaveId: SlaveID,
-      message: Array[Byte]): Unit = {}
-  override def executorLost(
-      driver: SchedulerDriver,
-      executorId: ExecutorID,
-      slaveId: SlaveID,
-      status: Int): Unit = {}
-  private def removeFromQueuedDrivers(id: String): Boolean = {
-    val index = queuedDrivers.indexWhere(_.submissionId.equals(id))
-    if (index != -1) {
-      queuedDrivers.remove(index)
-      queuedDriversState.expunge(id)
-      true
-    } else {
-      false
-    }
-  }
-  private def removeFromLaunchedDrivers(id: String): Boolean = {
-    if (launchedDrivers.remove(id).isDefined) {
-      launchedDriversState.expunge(id)
-      true
-    } else {
-      false
-    }
-  }
-  private def removeFromPendingRetryDrivers(id: String): Boolean = {
-    val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id))
-    if (index != -1) {
-      pendingRetryDrivers.remove(index)
-      pendingRetryDriversState.expunge(id)
-      true
-    } else {
-      false
-    }
-  }
-  def getQueuedDriversSize: Int = queuedDrivers.size
-  def getLaunchedDriversSize: Int = launchedDrivers.size
-  def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
deleted file mode 100644
index 1fe9497..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
+++ /dev/null
@@ -1,40 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.scheduler.cluster.mesos
-import com.codahale.metrics.{Gauge, MetricRegistry}
-import org.apache.spark.metrics.source.Source
-private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterScheduler)
-  extends Source {
-  override def sourceName: String = "mesos_cluster"
-  override def metricRegistry: MetricRegistry = new MetricRegistry()
-  metricRegistry.register("waitingDrivers"), new Gauge[Int] {
-    override def getValue: Int = scheduler.getQueuedDriversSize
-  })
-  metricRegistry.register("launchedDrivers"), new Gauge[Int] {
-    override def getValue: Int = scheduler.getLaunchedDriversSize
-  })
-  metricRegistry.register("retryDrivers"), new Gauge[Int] {
-    override def getValue: Int = scheduler.getPendingRetryDriversSize
-  })
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
deleted file mode 100644
index 6b9313e..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ /dev/null
@@ -1,642 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.scheduler.cluster.mesos
-import java.util.{Collections, List => JList}
-import java.util.concurrent.locks.ReentrantLock
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
-import org.apache.spark.rpc.RpcEndpointAddress
-import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
- * latency.
- *
- * Unfortunately this has a bit of duplication from [[MesosFineGrainedSchedulerBackend]],
- * but it seems hard to remove this.
- */
-private[spark] class MesosCoarseGrainedSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext,
-    master: String,
-    securityManager: SecurityManager)
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
-  with org.apache.mesos.Scheduler
-  with MesosSchedulerUtils {
-  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
-  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
-  val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
-  private[this] val shutdownTimeoutMS =
-    conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
-      .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
-  // Synchronization protected by stateLock
-  private[this] var stopCalled: Boolean = false
-  // If shuffle service is enabled, the Spark driver will register with the shuffle service.
-  // This is for cleaning up shuffle files reliably.
-  private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
-  // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new mutable.HashMap[String, Int]
-  var totalCoresAcquired = 0
-  // SlaveID -> Slave
-  // This map accumulates entries for the duration of the job.  Slaves are never deleted, because
-  // we need to maintain e.g. failure state and connection state.
-  private val slaves = new mutable.HashMap[String, Slave]
-  /**
-   * The total number of executors we aim to have. Undefined when not using dynamic allocation.
-   * Initially set to 0 when using dynamic allocation, the executor allocation manager will send
-   * the real initial limit later.
-   */
-  private var executorLimitOption: Option[Int] = {
-    if (Utils.isDynamicAllocationEnabled(conf)) {
-      Some(0)
-    } else {
-      None
-    }
-  }
-  /**
-   *  Return the current executor limit, which may be [[Int.MaxValue]]
-   *  before properly initialized.
-   */
-  private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)
-  // private lock object protecting mutable state above. Using the intrinsic lock
-  // may lead to deadlocks since the superclass might also try to lock
-  private val stateLock = new ReentrantLock
-  val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
-  // Offer constraints
-  private val slaveOfferConstraints =
-    parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
-  // Reject offers with mismatched constraints in seconds
-  private val rejectOfferDurationForUnmetConstraints =
-    getRejectOfferDurationForUnmetConstraints(sc)
-  // Reject offers when we reached the maximum number of cores for this framework
-  private val rejectOfferDurationForReachedMaxCores =
-    getRejectOfferDurationForReachedMaxCores(sc)
-  // A client for talking to the external shuffle service
-  private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
-    if (shuffleServiceEnabled) {
-      Some(getShuffleClient())
-    } else {
-      None
-    }
-  }
-  // This method is factored out for testability
-  protected def getShuffleClient(): MesosExternalShuffleClient = {
-    new MesosExternalShuffleClient(
-      SparkTransportConf.fromSparkConf(conf, "shuffle"),
-      securityManager,
-      securityManager.isAuthenticationEnabled(),
-      securityManager.isSaslEncryptionEnabled())
-  }
-  var nextMesosTaskId = 0
-  @volatile var appId: String = _
-  def newMesosTaskId(): String = {
-    val id = nextMesosTaskId
-    nextMesosTaskId += 1
-    id.toString
-  }
-  override def start() {
-    super.start()
-    val driver = createSchedulerDriver(
-      master,
-      MesosCoarseGrainedSchedulerBackend.this,
-      sc.sparkUser,
-      sc.appName,
-      sc.conf,
-      sc.conf.getOption("spark.mesos.driver.webui.url").orElse(,
-      None,
-      None,
-      sc.conf.getOption("spark.mesos.driver.frameworkId")
-    )
-    unsetFrameworkID(sc)
-    startScheduler(driver)
-  }
-  def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
-    val environment = Environment.newBuilder()
-    val extraClassPath = conf.getOption("spark.executor.extraClassPath")
-    extraClassPath.foreach { cp =>
-      environment.addVariables(
-        Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
-    }
-    val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")
-    // Set the environment variable through a command prefix
-    // to append to the existing value of the variable
-    val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p =>
-      Utils.libraryPathEnvPrefix(Seq(p))
-    }.getOrElse("")
-    environment.addVariables(
-      Environment.Variable.newBuilder()
-        .setName("SPARK_EXECUTOR_OPTS")
-        .setValue(extraJavaOpts)
-        .build())
-    sc.executorEnvs.foreach { case (key, value) =>
-      environment.addVariables(Environment.Variable.newBuilder()
-        .setName(key)
-        .setValue(value)
-        .build())
-    }
-    val command = CommandInfo.newBuilder()
-      .setEnvironment(environment)
-    val uri = conf.getOption("spark.executor.uri")
-      .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
-    if (uri.isEmpty) {
-      val executorSparkHome = conf.getOption("spark.mesos.executor.home")
-        .orElse(sc.getSparkHome())
-        .getOrElse {
-          throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
-        }
-      val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
-      command.setValue(
-        "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
-          .format(prefixEnv, runScript) +
-        s" --driver-url $driverURL" +
-        s" --executor-id $taskId" +
-        s" --hostname ${offer.getHostname}" +
-        s" --cores $numCores" +
-        s" --app-id $appId")
-    } else {
-      // Grab everything to the first '.'. We'll use that and '*' to
-      // glob the directory "correctly".
-      val basename = uri.get.split('/').last.split('.').head
-      command.setValue(
-        s"cd $basename*; $prefixEnv " +
-        "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
-        s" --driver-url $driverURL" +
-        s" --executor-id $taskId" +
-        s" --hostname ${offer.getHostname}" +
-        s" --cores $numCores" +
-        s" --app-id $appId")
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
-    }
-    conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
-  }
-  protected def driverURL: String = {
-    if (conf.contains("spark.testing")) {
-      "driverURL"
-    } else {
-      RpcEndpointAddress(
-        conf.get(""),
-        conf.get("spark.driver.port").toInt,
-        CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
-    }
-  }
-  override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
-  override def registered(
-      d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    appId = frameworkId.getValue
-    mesosExternalShuffleClient.foreach(_.init(appId))
-    markRegistered()
-  }
-  override def sufficientResourcesRegistered(): Boolean = {
-    totalCoresAcquired >= maxCores * minRegisteredRatio
-  }
-  override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
-  override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
-  /**
-   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
-   * unless we've already launched more than we wanted to.
-   */
-  override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
-    stateLock.synchronized {
-      if (stopCalled) {
-        logDebug("Ignoring offers during shutdown")
-        // Driver should simply return a stopped status on race
-        // condition between this.stop() and completing here
-        return
-      }
-      logDebug(s"Received ${offers.size} resource offers.")
-      val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
-        val offerAttributes = toAttributeMap(offer.getAttributesList)
-        matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
-      }
-      declineUnmatchedOffers(d, unmatchedOffers)
-      handleMatchedOffers(d, matchedOffers)
-    }
-  }
-  private def declineUnmatchedOffers(
-      d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
-    offers.foreach { offer =>
-      declineOffer(d, offer, Some("unmet constraints"),
-        Some(rejectOfferDurationForUnmetConstraints))
-    }
-  }
-  private def declineOffer(
-      d: org.apache.mesos.SchedulerDriver,
-      offer: Offer,
-      reason: Option[String] = None,
-      refuseSeconds: Option[Long] = None): Unit = {
-    val id = offer.getId.getValue
-    val offerAttributes = toAttributeMap(offer.getAttributesList)
-    val mem = getResource(offer.getResourcesList, "mem")
-    val cpus = getResource(offer.getResourcesList, "cpus")
-    val ports = getRangeResource(offer.getResourcesList, "ports")
-    logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
-      s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
- => s" (reason: $r)").getOrElse(""))
-    refuseSeconds match {
-      case Some(seconds) =>
-        val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
-        d.declineOffer(offer.getId, filters)
-      case _ => d.declineOffer(offer.getId)
-    }
-  }
-  /**
-   * Launches executors on accepted offers, and declines unused offers. Executors are launched
-   * round-robin on offers.
-   *
-   * @param d SchedulerDriver
-   * @param offers Mesos offers that match attribute constraints
-   */
-  private def handleMatchedOffers(
-      d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
-    val tasks = buildMesosTasks(offers)
-    for (offer <- offers) {
-      val offerAttributes = toAttributeMap(offer.getAttributesList)
-      val offerMem = getResource(offer.getResourcesList, "mem")
-      val offerCpus = getResource(offer.getResourcesList, "cpus")
-      val offerPorts = getRangeResource(offer.getResourcesList, "ports")
-      val id = offer.getId.getValue
-      if (tasks.contains(offer.getId)) { // accept
-        val offerTasks = tasks(offer.getId)
-        logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
-          s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." +
-          s"  Launching ${offerTasks.size} Mesos tasks.")
-        for (task <- offerTasks) {
-          val taskId = task.getTaskId
-          val mem = getResource(task.getResourcesList, "mem")
-          val cpus = getResource(task.getResourcesList, "cpus")
-          val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")
-          logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
-            s" ports: $ports")
-        }
-        d.launchTasks(
-          Collections.singleton(offer.getId),
-          offerTasks.asJava)
-      } else if (totalCoresAcquired >= maxCores) {
-        // Reject an offer for a configurable amount of time to avoid starving other frameworks
-        declineOffer(d, offer, Some("reached spark.cores.max"),
-          Some(rejectOfferDurationForReachedMaxCores))
-      } else {
-        declineOffer(d, offer)
-      }
-    }
-  }
-  /**
-   * Returns a map from OfferIDs to the tasks to launch on those offers.  In order to maximize
-   * per-task memory and IO, tasks are round-robin assigned to offers.
-   *
-   * @param offers Mesos offers that match attribute constraints
-   * @return A map from OfferID to a list of Mesos tasks to launch on that offer
-   */
-  private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
-    // offerID -> tasks
-    val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
-    // offerID -> resources
-    val remainingResources = mutable.Map( =>
-      (offer.getId.getValue, offer.getResourcesList)): _*)
-    var launchTasks = true
-    // TODO(mgummelt): combine offers for a single slave
-    //
-    // round-robin create executors on the available offers
-    while (launchTasks) {
-      launchTasks = false
-      for (offer <- offers) {
-        val slaveId = offer.getSlaveId.getValue
-        val offerId = offer.getId.getValue
-        val resources = remainingResources(offerId)
-        if (canLaunchTask(slaveId, resources)) {
-          // Create a task
-          launchTasks = true
-          val taskId = newMesosTaskId()
-          val offerCPUs = getResource(resources, "cpus").toInt
-          val taskCPUs = executorCores(offerCPUs)
-          val taskMemory = executorMemory(sc)
-          slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
-          val (resourcesLeft, resourcesToUse) =
-            partitionTaskResources(resources, taskCPUs, taskMemory)
-          val taskBuilder = MesosTaskInfo.newBuilder()
-            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
-            .setSlaveId(offer.getSlaveId)
-            .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
-            .setName("Task " + taskId)
-          taskBuilder.addAllResources(resourcesToUse.asJava)
-          sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-            MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
-              image,
-              sc.conf,
-              taskBuilder.getContainerBuilder
-            )
-          }
-          tasks(offer.getId) ::=
-          remainingResources(offerId) = resourcesLeft.asJava
-          totalCoresAcquired += taskCPUs
-          coresByTaskId(taskId) = taskCPUs
-        }
-      }
-    }
-    tasks.toMap
-  }
-  /** Extracts task needed resources from a list of available resources. */
-  private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
-    : (List[Resource], List[Resource]) = {
-    // partition cpus & mem
-    val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
-    val (afterMemResources, memResourcesToUse) =
-      partitionResources(afterCPUResources.asJava, "mem", taskMemory)
-    // If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
-    // on the same host. This essentially means one executor per host.
-    // TODO: handle network isolator case
-    val (nonPortResources, portResourcesToUse) =
-      partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)
-    (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
-  }
-  private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
-    val offerMem = getResource(resources, "mem")
-    val offerCPUs = getResource(resources, "cpus").toInt
-    val cpus = executorCores(offerCPUs)
-    val mem = executorMemory(sc)
-    val ports = getRangeResource(resources, "ports")
-    val meetsPortRequirements = checkPorts(sc.conf, ports)
-    cpus > 0 &&
-      cpus <= offerCPUs &&
-      cpus + totalCoresAcquired <= maxCores &&
-      mem <= offerMem &&
-      numExecutors() < executorLimit &&
-      slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
-      meetsPortRequirements
-  }
-  private def executorCores(offerCPUs: Int): Int = {
-    sc.conf.getInt("spark.executor.cores",
-      math.min(offerCPUs, maxCores - totalCoresAcquired))
-  }
-  override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
-    val taskId = status.getTaskId.getValue
-    val slaveId = status.getSlaveId.getValue
-    val state = TaskState.fromMesos(status.getState)
-    logInfo(s"Mesos task $taskId is now ${status.getState}")
-    stateLock.synchronized {
-      val slave = slaves(slaveId)
-      // If the shuffle service is enabled, have the driver register with each one of the
-      // shuffle services. This allows the shuffle services to clean up state associated with
-      // this application when the driver exits. There is currently not a great way to detect
-      // this through Mesos, since the shuffle services are set up independently.
-      if (state.equals(TaskState.RUNNING) &&
-          shuffleServiceEnabled &&
-          !slave.shuffleRegistered) {
-        assume(mesosExternalShuffleClient.isDefined,
-          "External shuffle client was not instantiated even though shuffle service is enabled.")
-        // TODO: Remove this and allow the MesosExternalShuffleService to detect
-        // framework termination when new Mesos Framework HTTP API is available.
-        val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
-        logDebug(s"Connecting to shuffle service on slave $slaveId, " +
-            s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
-        mesosExternalShuffleClient.get
-          .registerDriverWithShuffleService(
-            slave.hostname,
-            externalShufflePort,
-            sc.conf.getTimeAsMs("",
-              s"${sc.conf.getTimeAsMs("", "120s")}ms"),
-            sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
-        slave.shuffleRegistered = true
-      }
-      if (TaskState.isFinished(state)) {
-        // Remove the cores we have remembered for this task, if it's in the hashmap
-        for (cores <- coresByTaskId.get(taskId)) {
-          totalCoresAcquired -= cores
-          coresByTaskId -= taskId
-        }
-        // If it was a failure, mark the slave as failed for blacklisting purposes
-        if (TaskState.isFailed(state)) {
-          slave.taskFailures += 1
-          if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
-            logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
-                "is Spark installed on it?")
-          }
-        }
-        executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
-        // In case we'd rejected everything before but have now lost a node
-        d.reviveOffers()
-      }
-    }
-  }
-  override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
-    logError(s"Mesos error: $message")
-    scheduler.error(message)
-  }
-  override def stop() {
-    // Make sure we're not launching tasks during shutdown
-    stateLock.synchronized {
-      if (stopCalled) {
-        logWarning("Stop called multiple times, ignoring")
-        return
-      }
-      stopCalled = true
-      super.stop()
-    }
-    // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them.
-    // See SPARK-12330
-    val startTime = System.nanoTime()
-    // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent
-    while (numExecutors() > 0 &&
-      System.nanoTime() - startTime < shutdownTimeoutMS * 1000L * 1000L) {
-      Thread.sleep(100)
-    }
-    if (numExecutors() > 0) {
-      logWarning(s"Timed out waiting for ${numExecutors()} remaining executors "
-        + s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files "
-        + "on the mesos nodes.")
-    }
-    // Close the mesos external shuffle client if used
-    mesosExternalShuffleClient.foreach(_.close())
-    if (mesosDriver != null) {
-      mesosDriver.stop()
-    }
-  }
-  override def frameworkMessage(
-      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-  /**
-   * Called when a slave is lost or a Mesos task finished. Updates local view on
-   * what tasks are running. It also notifies the driver that an executor was removed.
-   */
-  private def executorTerminated(
-      d: org.apache.mesos.SchedulerDriver,
-      slaveId: String,
-      taskId: String,
-      reason: String): Unit = {
-    stateLock.synchronized {
-      // Do not call removeExecutor() after this scheduler backend was stopped because
-      // removeExecutor() internally will send a message to the driver endpoint but
-      // the driver endpoint is not available now, otherwise an exception will be thrown.
-      if (!stopCalled) {
-        removeExecutor(taskId, SlaveLost(reason))
-      }
-      slaves(slaveId).taskIDs.remove(taskId)
-    }
-  }
-  override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit = {
-    logInfo(s"Mesos slave lost: ${slaveId.getValue}")
-  }
-  override def executorLost(
-      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
-    logInfo("Mesos executor lost: %s".format(e.getValue))
-  }
-  override def applicationId(): String =
-    Option(appId).getOrElse {
-      logWarning("Application ID is not initialized yet.")
-      super.applicationId
-    }
-  override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    // We don't truly know if we can fulfill the full amount of executors
-    // since at coarse grain it depends on the amount of slaves available.
-    logInfo("Capping the total amount of executors to " + requestedTotal)
-    executorLimitOption = Some(requestedTotal)
-    true
-  }
-  override def doKillExecutors(executorIds: Seq[String]): Boolean = {
-    if (mesosDriver == null) {
-      logWarning("Asked to kill executors before the Mesos driver was started.")
-      false
-    } else {
-      for (executorId <- executorIds) {
-        val taskId = TaskID.newBuilder().setValue(executorId).build()
-        mesosDriver.killTask(taskId)
-      }
-      // no need to adjust `executorLimitOption` since the AllocationManager already communicated
-      // the desired limit through a call to `doRequestTotalExecutors`.
-      // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
-      true
-    }
-  }
-  private def numExecutors(): Int = {
-  }
-private class Slave(val hostname: String) {
-  val taskIDs = new mutable.HashSet[String]()
-  var taskFailures = 0
-  var shuffleRegistered = false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
deleted file mode 100644
index f1e48fa..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ /dev/null
@@ -1,451 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.scheduler.cluster.mesos
-import java.util.{ArrayList => JArrayList, Collections, List => JList}
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{HashMap, HashSet}
-import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.protobuf.ByteString
-import org.apache.spark.{SparkContext, SparkException, TaskState}
-import org.apache.spark.executor.MesosExecutorBackend
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.util.Utils
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
- */
-private[spark] class MesosFineGrainedSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext,
-    master: String)
-  extends SchedulerBackend
-  with org.apache.mesos.Scheduler
-  with MesosSchedulerUtils {
-  // Stores the slave ids that has launched a Mesos executor.
-  val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
-  val taskIdToSlaveId = new HashMap[Long, String]
-  // An ExecutorInfo for our tasks
-  var execArgs: Array[Byte] = null
-  var classLoader: ClassLoader = null
-  // The listener bus to publish executor added/removed events.
-  val listenerBus = sc.listenerBus
-  private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
-  // Offer constraints
-  private[this] val slaveOfferConstraints =
-    parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
-  // reject offers with mismatched constraints in seconds
-  private val rejectOfferDurationForUnmetConstraints =
-    getRejectOfferDurationForUnmetConstraints(sc)
-  @volatile var appId: String = _
-  override def start() {
-    classLoader = Thread.currentThread.getContextClassLoader
-    val driver = createSchedulerDriver(
-      master,
-      MesosFineGrainedSchedulerBackend.this,
-      sc.sparkUser,
-      sc.appName,
-      sc.conf,
-      sc.conf.getOption("spark.mesos.driver.webui.url").orElse(,
-      Option.empty,
-      Option.empty,
-      sc.conf.getOption("spark.mesos.driver.frameworkId")
-    )
-    unsetFrameworkID(sc)
-    startScheduler(driver)
-  }
-  /**
-   * Creates a MesosExecutorInfo that is used to launch a Mesos executor.
-   * @param availableResources Available resources that is offered by Mesos
-   * @param execId The executor id to assign to this new executor.
-   * @return A tuple of the new mesos executor info and the remaining available resources.
-   */
-  def createExecutorInfo(
-      availableResources: JList[Resource],
-      execId: String): (MesosExecutorInfo, JList[Resource]) = {
-    val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
-      .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
-      .getOrElse {
-      throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
-    }
-    val environment = Environment.newBuilder()
-    sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
-      environment.addVariables(
-        Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
-    }
-    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")
-    val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p =>
-      Utils.libraryPathEnvPrefix(Seq(p))
-    }.getOrElse("")
-    environment.addVariables(
-      Environment.Variable.newBuilder()
-        .setName("SPARK_EXECUTOR_OPTS")
-        .setValue(extraJavaOpts)
-        .build())
-    sc.executorEnvs.foreach { case (key, value) =>
-      environment.addVariables(Environment.Variable.newBuilder()
-        .setName(key)
-        .setValue(value)
-        .build())
-    }
-    val command = CommandInfo.newBuilder()
-      .setEnvironment(environment)
-    val uri = sc.conf.getOption("spark.executor.uri")
-      .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
-    val executorBackendName = classOf[MesosExecutorBackend].getName
-    if (uri.isEmpty) {
-      val executorPath = new File(executorSparkHome, "/bin/spark-class").getPath
-      command.setValue(s"$prefixEnv $executorPath $executorBackendName")
-    } else {
-      // Grab everything to the first '.'. We'll use that and '*' to
-      // glob the directory "correctly".
-      val basename = uri.get.split('/').last.split('.').head
-      command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
-    }
-    val builder = MesosExecutorInfo.newBuilder()
-    val (resourcesAfterCpu, usedCpuResources) =
-      partitionResources(availableResources, "cpus", mesosExecutorCores)
-    val (resourcesAfterMem, usedMemResources) =
-      partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc))
-    builder.addAllResources(usedCpuResources.asJava)
-    builder.addAllResources(usedMemResources.asJava)
-    sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
-    val executorInfo = builder
-      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
-      .setCommand(command)
-      .setData(ByteString.copyFrom(createExecArg()))
-    sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
-        image,
-        sc.conf,
-        executorInfo.getContainerBuilder()
-      )
-    }
-    (, resourcesAfterMem.asJava)
-  }
-  /**
-   * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
-   * containing all the spark.* system properties in the form of (String, String) pairs.
-   */
-  private def createExecArg(): Array[Byte] = {
-    if (execArgs == null) {
-      val props = new HashMap[String, String]
-      for ((key, value) <- sc.conf.getAll) {
-        props(key) = value
-      }
-      // Serialize the map as an array of (String, String) pairs
-      execArgs = Utils.serialize(props.toArray)
-    }
-    execArgs
-  }
-  override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
-  override def registered(
-      d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    inClassLoader() {
-      appId = frameworkId.getValue
-      logInfo("Registered as framework ID " + appId)
-      markRegistered()
-    }
-  }
-  private def inClassLoader()(fun: => Unit) = {
-    val oldClassLoader = Thread.currentThread.getContextClassLoader
-    Thread.currentThread.setContextClassLoader(classLoader)
-    try {
-      fun
-    } finally {
-      Thread.currentThread.setContextClassLoader(oldClassLoader)
-    }
-  }
-  override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
-  override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
-  private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
-    val builder = new StringBuilder
-    tasks.asScala.foreach { t =>
-      builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
-        .append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
-        .append("Task resources: ").append(t.getResourcesList).append("\n")
-        .append("Executor resources: ").append(t.getExecutor.getResourcesList)
-        .append("---------------------------------------------\n")
-    }
-    builder.toString()
-  }
-  /**
-   * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
-   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
-   * tasks are balanced across the cluster.
-   */
-  override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
-    inClassLoader() {
-      // Fail first on offers with unmet constraints
-      val (offersMatchingConstraints, offersNotMatchingConstraints) =
-        offers.asScala.partition { o =>
-          val offerAttributes = toAttributeMap(o.getAttributesList)
-          val meetsConstraints =
-            matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
-          // add some debug messaging
-          if (!meetsConstraints) {
-            val id = o.getId.getValue
-            logDebug(s"Declining offer: $id with attributes: $offerAttributes")
-          }
-          meetsConstraints
-        }
-      // These offers do not meet constraints. We don't need to see them again.
-      // Decline the offer for a long period of time.
-      offersNotMatchingConstraints.foreach { o =>
-        d.declineOffer(o.getId, Filters.newBuilder()
-          .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
-      }
-      // Of the matching constraints, see which ones give us enough memory and cores
-      val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
-        val mem = getResource(o.getResourcesList, "mem")
-        val cpus = getResource(o.getResourcesList, "cpus")
-        val slaveId = o.getSlaveId.getValue
-        val offerAttributes = toAttributeMap(o.getAttributesList)
-        // check offers for
-        //  1. Memory requirements
-        //  2. CPU requirements - need at least 1 for executor, 1 for task
-        val meetsMemoryRequirements = mem >= executorMemory(sc)
-        val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
-        val meetsRequirements =
-          (meetsMemoryRequirements && meetsCPURequirements) ||
-          (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
-        val debugstr = if (meetsRequirements) "Accepting" else "Declining"
-        logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
-          + s"$offerAttributes mem: $mem cpu: $cpus")
-        meetsRequirements
-      }
-      // Decline offers we ruled out immediately
-      unUsableOffers.foreach(o => d.declineOffer(o.getId))
-      val workerOffers = { o =>
-        val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
-          getResource(o.getResourcesList, "cpus").toInt
-        } else {
-          // If the Mesos executor has not been started on this slave yet, set aside a few
-          // cores for the Mesos executor by offering fewer cores to the Spark executor
-          (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt
-        }
-        new WorkerOffer(
-          o.getSlaveId.getValue,
-          o.getHostname,
-          cpus)
-      }
-      val slaveIdToOffer = => o.getSlaveId.getValue -> o).toMap
-      val slaveIdToWorkerOffer = => o.executorId -> o).toMap
-      val slaveIdToResources = new HashMap[String, JList[Resource]]()
-      usableOffers.foreach { o =>
-        slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
-      }
-      val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
-      val slavesIdsOfAcceptedOffers = HashSet[String]()
-      // Call into the TaskSchedulerImpl
-      val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
-      acceptedOffers
-        .foreach { offer =>
-          offer.foreach { taskDesc =>
-            val slaveId = taskDesc.executorId
-            slavesIdsOfAcceptedOffers += slaveId
-            taskIdToSlaveId(taskDesc.taskId) = slaveId
-            val (mesosTask, remainingResources) = createMesosTask(
-              taskDesc,
-              slaveIdToResources(slaveId),
-              slaveId)
-            mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
-              .add(mesosTask)
-            slaveIdToResources(slaveId) = remainingResources
-          }
-        }
-      // Reply to the offers
-      val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
-      mesosTasks.foreach { case (slaveId, tasks) =>
-        slaveIdToWorkerOffer.get(slaveId).foreach(o =>
-, slaveId,
-            // TODO: Add support for log urls for Mesos
-            new ExecutorInfo(, o.cores, Map.empty)))
-        )
-        logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
-        d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
-      }
-      // Decline offers that weren't used
-      // NOTE: This logic assumes that we only get a single offer for each host in a given batch
-      for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
-        d.declineOffer(o.getId)
-      }
-    }
-  }
-  /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */
-  def createMesosTask(
-      task: TaskDescription,
-      resources: JList[Resource],
-      slaveId: String): (MesosTaskInfo, JList[Resource]) = {
-    val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
-    val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) {
-      (slaveIdToExecutorInfo(slaveId), resources)
-    } else {
-      createExecutorInfo(resources, slaveId)
-    }
-    slaveIdToExecutorInfo(slaveId) = executorInfo
-    val (finalResources, cpuResources) =
-      partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
-    val taskInfo = MesosTaskInfo.newBuilder()
-      .setTaskId(taskId)
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
-      .setExecutor(executorInfo)
-      .setName(
-      .addAllResources(cpuResources.asJava)
-      .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
-      .build()
-    (taskInfo, finalResources.asJava)
-  }
-  override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
-    inClassLoader() {
-      val tid = status.getTaskId.getValue.toLong
-      val state = TaskState.fromMesos(status.getState)
-      synchronized {
-        if (TaskState.isFailed(TaskState.fromMesos(status.getState))
-          && taskIdToSlaveId.contains(tid)) {
-          // We lost the executor on this slave, so remember that it's gone
-          removeExecutor(taskIdToSlaveId(tid), "Lost executor")
-        }
-        if (TaskState.isFinished(state)) {
-          taskIdToSlaveId.remove(tid)
-        }
-      }
-      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
-    }
-  }
-  override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
-    inClassLoader() {
-      logError("Mesos error: " + message)
-      markErr()
-      scheduler.error(message)
-    }
-  }
-  override def stop() {
-    if (mesosDriver != null) {
-      mesosDriver.stop()
-    }
-  }
-  override def reviveOffers() {
-    mesosDriver.reviveOffers()
-  }
-  override def frameworkMessage(
-      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-  /**
-   * Remove executor associated with slaveId in a thread safe manner.
-   */
-  private def removeExecutor(slaveId: String, reason: String) = {
-    synchronized {
-, slaveId, reason))
-      slaveIdToExecutorInfo -= slaveId
-    }
-  }
-  private def recordSlaveLost(
-      d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
-    inClassLoader() {
-      logInfo("Mesos slave lost: " + slaveId.getValue)
-      removeExecutor(slaveId.getValue, reason.toString)
-      scheduler.executorLost(slaveId.getValue, reason)
-    }
-  }
-  override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID) {
-    recordSlaveLost(d, slaveId, SlaveLost())
-  }
-  override def executorLost(
-      d: org.apache.mesos.SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {
-    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
-                                                                 slaveId.getValue))
-    recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true))
-  }
-  override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
-    mesosDriver.killTask(
-      TaskID.newBuilder()
-        .setValue(taskId.toString).build()
-    )
-  }
-  // TODO: query Mesos for number of cores
-  override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8)
-  override def applicationId(): String =
-    Option(appId).getOrElse {
-      logWarning("Application ID is not initialized yet.")
-      super.applicationId
-    }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
deleted file mode 100644
index 3fe0674..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ /dev/null
@@ -1,165 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.scheduler.cluster.mesos
-import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
-import org.apache.mesos.Protos.ContainerInfo.DockerInfo
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.internal.Logging
- * A collection of utility functions which can be used by both the
- * MesosSchedulerBackend and the [[MesosFineGrainedSchedulerBackend]].
- */
-private[mesos] object MesosSchedulerBackendUtil extends Logging {
-  /**
-   * Parse a comma-delimited list of volume specs, each of which
-   * takes the form [host-dir:]container-dir[:rw|:ro].
-   */
-  def parseVolumesSpec(volumes: String): List[Volume] = {
-    volumes.split(",").map(_.split(":")).flatMap { spec =>
-        val vol: Volume.Builder = Volume
-          .newBuilder()
-          .setMode(Volume.Mode.RW)
-        spec match {
-          case Array(container_path) =>
-            Some(vol.setContainerPath(container_path))
-          case Array(container_path, "rw") =>
-            Some(vol.setContainerPath(container_path))
-          case Array(container_path, "ro") =>
-            Some(vol.setContainerPath(container_path)
-              .setMode(Volume.Mode.RO))
-          case Array(host_path, container_path) =>
-            Some(vol.setContainerPath(container_path)
-              .setHostPath(host_path))
-          case Array(host_path, container_path, "rw") =>
-            Some(vol.setContainerPath(container_path)
-              .setHostPath(host_path))
-          case Array(host_path, container_path, "ro") =>
-            Some(vol.setContainerPath(container_path)
-              .setHostPath(host_path)
-              .setMode(Volume.Mode.RO))
-          case spec =>
-            logWarning(s"Unable to parse volume specs: $volumes. "
-              + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
-            None
-      }
-    }
-    .map { }
-    .toList
-  }
-  /**
-   * Parse a comma-delimited list of port mapping specs, each of which
-   * takes the form host_port:container_port[:udp|:tcp]
-   *
-   * Note:
-   * the docker form is [ip:]host_port:container_port, but the DockerInfo
-   * message has no field for 'ip', and instead has a 'protocol' field.
-   * Docker itself only appears to support TCP, so this alternative form
-   * anticipates the expansion of the docker form to allow for a protocol
-   * and leaves open the chance for mesos to begin to accept an 'ip' field
-   */
-  def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = {
-    portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] =>
-      val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping
-        .newBuilder()
-        .setProtocol("tcp")
-      spec match {
-        case Array(host_port, container_port) =>
-          Some(portmap.setHostPort(host_port.toInt)
-            .setContainerPort(container_port.toInt))
-        case Array(host_port, container_port, protocol) =>
-          Some(portmap.setHostPort(host_port.toInt)
-            .setContainerPort(container_port.toInt)
-            .setProtocol(protocol))
-        case spec =>
-          logWarning(s"Unable to parse port mapping specs: $portmaps. "
-            + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"")
-          None
-      }
-    }
-    .map { }
-    .toList
-  }
-  /**
-   * Construct a DockerInfo structure and insert it into a ContainerInfo
-   */
-  def addDockerInfo(
-      container: ContainerInfo.Builder,
-      image: String,
-      containerizer: String,
-      forcePullImage: Boolean = false,
-      volumes: Option[List[Volume]] = None,
-      portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
-    containerizer match {
-      case "docker" =>
-        container.setType(ContainerInfo.Type.DOCKER)
-        val docker = ContainerInfo.DockerInfo.newBuilder()
-          .setImage(image)
-          .setForcePullImage(forcePullImage)
-        // TODO (mgummelt): Remove this. Portmaps have no effect,
-        //                  as we don't support bridge networking.
-        portmaps.foreach(_.foreach(docker.addPortMappings))
-        container.setDocker(docker)
-      case "mesos" =>
-        container.setType(ContainerInfo.Type.MESOS)
-        val imageProto = Image.newBuilder()
-          .setType(Image.Type.DOCKER)
-          .setDocker(Image.Docker.newBuilder().setName(image))
-          .setCached(!forcePullImage)
-        container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
-      case _ =>
-        throw new SparkException(
-          "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
-    }
-    volumes.foreach(_.foreach(container.addVolumes))
-  }
-  /**
-   * Setup a docker containerizer from MesosDriverDescription scheduler properties
-   */
-  def setupContainerBuilderDockerInfo(
-    imageName: String,
-    conf: SparkConf,
-    builder: ContainerInfo.Builder): Unit = {
-    val forcePullImage = conf
-      .getOption("spark.mesos.executor.docker.forcePullImage")
-      .exists(_.equals("true"))
-    val volumes = conf
-      .getOption("spark.mesos.executor.docker.volumes")
-      .map(parseVolumesSpec)
-    val portmaps = conf
-      .getOption("spark.mesos.executor.docker.portmaps")
-      .map(parsePortMappingsSpec)
-    val containerizer = conf.get("spark.mesos.containerizer", "docker")
-    addDockerInfo(
-      builder,
-      imageName,
-      containerizer,
-      forcePullImage = forcePullImage,
-      volumes = volumes,
-      portmaps = portmaps)
-    logDebug("setupContainerDockerInfo: using docker image: " + imageName)
-  }

To unsubscribe, e-mail:
For additional commands, e-mail: