You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/08/26 19:25:38 UTC

[3/7] spark git commit: [SPARK-16967] move mesos to module

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
new file mode 100644
index 0000000..0b45499
--- /dev/null
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{Collections, Date, List => JList}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.mesos.{Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.Environment.Variable
+import org.apache.mesos.Protos.TaskStatus.Reason
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.util.Utils
+
+/**
+ * Tracks the current state of a Mesos Task that runs a Spark driver.
+ * @param driverDescription Submitted driver description from
+ * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]]
+ * @param taskId Mesos TaskID generated for the task
+ * @param slaveId Slave ID that the task is assigned to
+ * @param mesosTaskStatus The last known task status update.
+ * @param startDate The date the task was launched
+ * @param finishDate The date the task finished
+ * @param frameworkId Mesos framework ID the task registers with
+ */
+private[spark] class MesosClusterSubmissionState(
+    val driverDescription: MesosDriverDescription,
+    val taskId: TaskID,
+    val slaveId: SlaveID,
+    var mesosTaskStatus: Option[TaskStatus],
+    var startDate: Date,
+    var finishDate: Option[Date],
+    val frameworkId: String)
+  extends Serializable {
+
+  def copy(): MesosClusterSubmissionState = {
+    new MesosClusterSubmissionState(
+      driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId)
+  }
+}
+
+/**
+ * Tracks the retry state of a driver, which includes the next time it should be scheduled
+ * and necessary information to do exponential backoff.
+ * This class is not thread-safe, and we expect the caller to handle synchronizing state.
+ *
+ * @param lastFailureStatus Last Task status when it failed.
+ * @param retries Number of times it has been retried.
+ * @param nextRetry Time at which it should be retried next
+ * @param waitTime The amount of time driver is scheduled to wait until next retry.
+ */
+private[spark] class MesosClusterRetryState(
+    val lastFailureStatus: TaskStatus,
+    val retries: Int,
+    val nextRetry: Date,
+    val waitTime: Int) extends Serializable {
+  def copy(): MesosClusterRetryState =
+    new MesosClusterRetryState(lastFailureStatus, retries, nextRetry, waitTime)
+}
+
+/**
+ * The full state of the cluster scheduler, currently being used for displaying
+ * information on the UI.
+ *
+ * @param frameworkId Mesos Framework id for the cluster scheduler.
+ * @param masterUrl The Mesos master url
+ * @param queuedDrivers All drivers queued to be launched
+ * @param launchedDrivers All launched or running drivers
+ * @param finishedDrivers All terminated drivers
+ * @param pendingRetryDrivers All drivers pending to be retried
+ */
+private[spark] class MesosClusterSchedulerState(
+    val frameworkId: String,
+    val masterUrl: Option[String],
+    val queuedDrivers: Iterable[MesosDriverDescription],
+    val launchedDrivers: Iterable[MesosClusterSubmissionState],
+    val finishedDrivers: Iterable[MesosClusterSubmissionState],
+    val pendingRetryDrivers: Iterable[MesosDriverDescription])
+
+/**
+ * The full state of a Mesos driver, that is being used to display driver information on the UI.
+ */
+private[spark] class MesosDriverState(
+    val state: String,
+    val description: MesosDriverDescription,
+    val submissionState: Option[MesosClusterSubmissionState] = None)
+
+/**
+ * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
+ * as Mesos tasks in a Mesos cluster.
+ * All drivers are launched asynchronously by the framework, which will eventually be launched
+ * by one of the slaves in the cluster. The results of the driver will be stored in slave's task
+ * sandbox which is accessible by visiting the Mesos UI.
+ * This scheduler supports recovery by persisting all its state and performs task reconciliation
+ * on recover, which gets all the latest state for all the drivers from Mesos master.
+ */
+private[spark] class MesosClusterScheduler(
+    engineFactory: MesosClusterPersistenceEngineFactory,
+    conf: SparkConf)
+  extends Scheduler with MesosSchedulerUtils {
+  var frameworkUrl: String = _
+  private val metricsSystem =
+    MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
+  private val master = conf.get("spark.master")
+  private val appName = conf.get("spark.app.name")
+  private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
+  private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
+  private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
+  private val schedulerState = engineFactory.createEngine("scheduler")
+  private val stateLock = new Object()
+  private val finishedDrivers =
+    new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
+  private var frameworkId: String = null
+  // Holds all the launched drivers and current launch state, keyed by driver id.
+  private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]()
+  // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation.
+  // All drivers that are loaded after failover are added here, as we need get the latest
+  // state of the tasks from Mesos.
+  private val pendingRecover = new mutable.HashMap[String, SlaveID]()
+  // Stores all the submitted drivers that hasn't been launched.
+  private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
+  // All supervised drivers that are waiting to retry after termination.
+  private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
+  private val queuedDriversState = engineFactory.createEngine("driverQueue")
+  private val launchedDriversState = engineFactory.createEngine("launchedDrivers")
+  private val pendingRetryDriversState = engineFactory.createEngine("retryList")
+  // Flag to mark if the scheduler is ready to be called, which is until the scheduler
+  // is registered with Mesos master.
+  @volatile protected var ready = false
+  private var masterInfo: Option[MasterInfo] = None
+
+  def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
+    val c = new CreateSubmissionResponse
+    if (!ready) {
+      c.success = false
+      c.message = "Scheduler is not ready to take requests"
+      return c
+    }
+
+    stateLock.synchronized {
+      if (isQueueFull()) {
+        c.success = false
+        c.message = "Already reached maximum submission size"
+        return c
+      }
+      c.submissionId = desc.submissionId
+      queuedDriversState.persist(desc.submissionId, desc)
+      queuedDrivers += desc
+      c.success = true
+    }
+    c
+  }
+
+  def killDriver(submissionId: String): KillSubmissionResponse = {
+    val k = new KillSubmissionResponse
+    if (!ready) {
+      k.success = false
+      k.message = "Scheduler is not ready to take requests"
+      return k
+    }
+    k.submissionId = submissionId
+    stateLock.synchronized {
+      // We look for the requested driver in the following places:
+      // 1. Check if submission is running or launched.
+      // 2. Check if it's still queued.
+      // 3. Check if it's in the retry list.
+      // 4. Check if it has already completed.
+      if (launchedDrivers.contains(submissionId)) {
+        val task = launchedDrivers(submissionId)
+        mesosDriver.killTask(task.taskId)
+        k.success = true
+        k.message = "Killing running driver"
+      } else if (removeFromQueuedDrivers(submissionId)) {
+        k.success = true
+        k.message = "Removed driver while it's still pending"
+      } else if (removeFromPendingRetryDrivers(submissionId)) {
+        k.success = true
+        k.message = "Removed driver while it's being retried"
+      } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
+        k.success = false
+        k.message = "Driver already terminated"
+      } else {
+        k.success = false
+        k.message = "Cannot find driver"
+      }
+    }
+    k
+  }
+
+  def getDriverStatus(submissionId: String): SubmissionStatusResponse = {
+    val s = new SubmissionStatusResponse
+    if (!ready) {
+      s.success = false
+      s.message = "Scheduler is not ready to take requests"
+      return s
+    }
+    s.submissionId = submissionId
+    stateLock.synchronized {
+      if (queuedDrivers.exists(_.submissionId.equals(submissionId))) {
+        s.success = true
+        s.driverState = "QUEUED"
+      } else if (launchedDrivers.contains(submissionId)) {
+        s.success = true
+        s.driverState = "RUNNING"
+        launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString)
+      } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
+        s.success = true
+        s.driverState = "FINISHED"
+        finishedDrivers
+          .find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus
+          .foreach(state => s.message = state.toString)
+      } else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) {
+        val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId))
+          .get.retryState.get.lastFailureStatus
+        s.success = true
+        s.driverState = "RETRYING"
+        s.message = status.toString
+      } else {
+        s.success = false
+        s.driverState = "NOT_FOUND"
+      }
+    }
+    s
+  }
+
+  /**
+   * Gets the driver state to be displayed on the Web UI.
+   */
+  def getDriverState(submissionId: String): Option[MesosDriverState] = {
+    stateLock.synchronized {
+      queuedDrivers.find(_.submissionId.equals(submissionId))
+        .map(d => new MesosDriverState("QUEUED", d))
+        .orElse(launchedDrivers.get(submissionId)
+          .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d))))
+        .orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId))
+          .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d))))
+        .orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId))
+          .map(d => new MesosDriverState("RETRYING", d)))
+    }
+  }
+
+  private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity
+
+  /**
+   * Recover scheduler state that is persisted.
+   * We still need to do task reconciliation to be up to date of the latest task states
+   * as it might have changed while the scheduler is failing over.
+   */
+  private def recoverState(): Unit = {
+    stateLock.synchronized {
+      launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state =>
+        launchedDrivers(state.taskId.getValue) = state
+        pendingRecover(state.taskId.getValue) = state.slaveId
+      }
+      queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d)
+      // There is potential timing issue where a queued driver might have been launched
+      // but the scheduler shuts down before the queued driver was able to be removed
+      // from the queue. We try to mitigate this issue by walking through all queued drivers
+      // and remove if they're already launched.
+      queuedDrivers
+        .filter(d => launchedDrivers.contains(d.submissionId))
+        .foreach(d => removeFromQueuedDrivers(d.submissionId))
+      pendingRetryDriversState.fetchAll[MesosDriverDescription]()
+        .foreach(s => pendingRetryDrivers += s)
+      // TODO: Consider storing finished drivers so we can show them on the UI after
+      // failover. For now we clear the history on each recovery.
+      finishedDrivers.clear()
+    }
+  }
+
+  /**
+   * Starts the cluster scheduler and wait until the scheduler is registered.
+   * This also marks the scheduler to be ready for requests.
+   */
+  def start(): Unit = {
+    // TODO: Implement leader election to make sure only one framework running in the cluster.
+    val fwId = schedulerState.fetch[String]("frameworkId")
+    fwId.foreach { id =>
+      frameworkId = id
+    }
+    recoverState()
+    metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
+    metricsSystem.start()
+    val driver = createSchedulerDriver(
+      master,
+      MesosClusterScheduler.this,
+      Utils.getCurrentUserName(),
+      appName,
+      conf,
+      Some(frameworkUrl),
+      Some(true),
+      Some(Integer.MAX_VALUE),
+      fwId)
+
+    startScheduler(driver)
+    ready = true
+  }
+
+  def stop(): Unit = {
+    ready = false
+    metricsSystem.report()
+    metricsSystem.stop()
+    mesosDriver.stop(true)
+  }
+
+  override def registered(
+      driver: SchedulerDriver,
+      newFrameworkId: FrameworkID,
+      masterInfo: MasterInfo): Unit = {
+    logInfo("Registered as framework ID " + newFrameworkId.getValue)
+    if (newFrameworkId.getValue != frameworkId) {
+      frameworkId = newFrameworkId.getValue
+      schedulerState.persist("frameworkId", frameworkId)
+    }
+    markRegistered()
+
+    stateLock.synchronized {
+      this.masterInfo = Some(masterInfo)
+      if (!pendingRecover.isEmpty) {
+        // Start task reconciliation if we need to recover.
+        val statuses = pendingRecover.collect {
+          case (taskId, slaveId) =>
+            val newStatus = TaskStatus.newBuilder()
+              .setTaskId(TaskID.newBuilder().setValue(taskId).build())
+              .setSlaveId(slaveId)
+              .setState(MesosTaskState.TASK_STAGING)
+              .build()
+            launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus))
+              .getOrElse(newStatus)
+        }
+        // TODO: Page the status updates to avoid trying to reconcile
+        // a large amount of tasks at once.
+        driver.reconcileTasks(statuses.toSeq.asJava)
+      }
+    }
+  }
+
+  private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
+    desc.conf.getOption("spark.executor.uri")
+      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+  }
+
+  private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
+    s"${frameworkId}-${desc.submissionId}"
+  }
+
+  private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
+    m.updated(k, f(m.getOrElse(k, default)))
+  }
+
+  private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
+    // TODO(mgummelt): Don't do this here.  This should be passed as a --conf
+    val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
+      v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
+    )
+
+    val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
+
+    val envBuilder = Environment.newBuilder()
+    env.foreach { case (k, v) =>
+      envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v))
+    }
+    envBuilder.build()
+  }
+
+  private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
+    val confUris = List(conf.getOption("spark.mesos.uris"),
+      desc.conf.getOption("spark.mesos.uris"),
+      desc.conf.getOption("spark.submit.pyFiles")).flatMap(
+      _.map(_.split(",").map(_.trim))
+    ).flatten
+
+    val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
+
+    ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
+      CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
+  }
+
+  private def getDriverCommandValue(desc: MesosDriverDescription): String = {
+    val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
+    val executorUri = getDriverExecutorURI(desc)
+    // Gets the path to run spark-submit, and the path to the Mesos sandbox.
+    val (executable, sandboxPath) = if (dockerDefined) {
+      // Application jar is automatically downloaded in the mounted sandbox by Mesos,
+      // and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
+      ("./bin/spark-submit", "$MESOS_SANDBOX")
+    } else if (executorUri.isDefined) {
+      val folderBasename = executorUri.get.split('/').last.split('.').head
+
+      val entries = conf.getOption("spark.executor.extraLibraryPath")
+        .map(path => Seq(path) ++ desc.command.libraryPathEntries)
+        .getOrElse(desc.command.libraryPathEntries)
+
+      val prefixEnv = if (!entries.isEmpty) Utils.libraryPathEnvPrefix(entries) else ""
+
+      val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
+      // Sandbox path points to the parent folder as we chdir into the folderBasename.
+      (cmdExecutable, "..")
+    } else {
+      val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
+        .orElse(conf.getOption("spark.home"))
+        .orElse(Option(System.getenv("SPARK_HOME")))
+        .getOrElse {
+          throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+        }
+      val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath
+      // Sandbox points to the current directory by default with Mesos.
+      (cmdExecutable, ".")
+    }
+    val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ")
+    val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString()
+    val appArguments = desc.command.arguments.mkString(" ")
+
+    s"$executable $cmdOptions $primaryResource $appArguments"
+  }
+
+  private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
+    val builder = CommandInfo.newBuilder()
+    builder.setValue(getDriverCommandValue(desc))
+    builder.setEnvironment(getDriverEnvironment(desc))
+    builder.addAllUris(getDriverUris(desc).asJava)
+    builder.build()
+  }
+
+  private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
+    var options = Seq(
+      "--name", desc.conf.get("spark.app.name"),
+      "--master", s"mesos://${conf.get("spark.master")}",
+      "--driver-cores", desc.cores.toString,
+      "--driver-memory", s"${desc.mem}M")
+
+    // Assume empty main class means we're running python
+    if (!desc.command.mainClass.equals("")) {
+      options ++= Seq("--class", desc.command.mainClass)
+    }
+
+    desc.conf.getOption("spark.executor.memory").foreach { v =>
+      options ++= Seq("--executor-memory", v)
+    }
+    desc.conf.getOption("spark.cores.max").foreach { v =>
+      options ++= Seq("--total-executor-cores", v)
+    }
+    desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
+      val formattedFiles = pyFiles.split(",")
+        .map { path => new File(sandboxPath, path.split("/").last).toString() }
+        .mkString(",")
+      options ++= Seq("--py-files", formattedFiles)
+    }
+
+    // --conf
+    val replicatedOptionsBlacklist = Set(
+      "spark.jars", // Avoids duplicate classes in classpath
+      "spark.submit.deployMode", // this would be set to `cluster`, but we need client
+      "spark.master" // this contains the address of the dispatcher, not master
+    )
+    val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
+    val driverConf = desc.conf.getAll
+      .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
+      .toMap
+    (defaultConf ++ driverConf).foreach { case (key, value) =>
+      options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
+
+    options
+  }
+
+  /**
+   * Escape args for Unix-like shells, unless already quoted by the user.
+   * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
+   * and http://www.grymoire.com/Unix/Quote.html
+ *
+   * @param value argument
+   * @return escaped argument
+   */
+  private[scheduler] def shellEscape(value: String): String = {
+    val WrappedInQuotes = """^(".+"|'.+')$""".r
+    val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r
+    value match {
+      case WrappedInQuotes(c) => value // The user quoted his args, don't touch it!
+      case ShellSpecialChars(c) => "\"" + value.replaceAll("""(["`\$\\])""", """\\$1""") + "\""
+      case _: String => value // Don't touch harmless strings
+    }
+  }
+
+  private class ResourceOffer(
+      val offerId: OfferID,
+      val slaveId: SlaveID,
+      var resources: JList[Resource]) {
+    override def toString(): String = {
+      s"Offer id: ${offerId}, resources: ${resources}"
+    }
+  }
+
+  private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
+    val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
+
+    val (remainingResources, cpuResourcesToUse) =
+      partitionResources(offer.resources, "cpus", desc.cores)
+    val (finalResources, memResourcesToUse) =
+      partitionResources(remainingResources.asJava, "mem", desc.mem)
+    offer.resources = finalResources.asJava
+
+    val appName = desc.conf.get("spark.app.name")
+    val taskInfo = TaskInfo.newBuilder()
+      .setTaskId(taskId)
+      .setName(s"Driver for ${appName}")
+      .setSlaveId(offer.slaveId)
+      .setCommand(buildDriverCommand(desc))
+      .addAllResources(cpuResourcesToUse.asJava)
+      .addAllResources(memResourcesToUse.asJava)
+
+    desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
+        desc.conf,
+        taskInfo.getContainerBuilder)
+    }
+
+    taskInfo.build
+  }
+
+  /**
+   * This method takes all the possible candidates and attempt to schedule them with Mesos offers.
+   * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
+   * logic on each task.
+   */
+  private def scheduleTasks(
+      candidates: Seq[MesosDriverDescription],
+      afterLaunchCallback: (String) => Boolean,
+      currentOffers: List[ResourceOffer],
+      tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
+    for (submission <- candidates) {
+      val driverCpu = submission.cores
+      val driverMem = submission.mem
+      logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
+      val offerOption = currentOffers.find { o =>
+        getResource(o.resources, "cpus") >= driverCpu &&
+        getResource(o.resources, "mem") >= driverMem
+      }
+      if (offerOption.isEmpty) {
+        logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
+          s"cpu: $driverCpu, mem: $driverMem")
+      } else {
+        val offer = offerOption.get
+        val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
+        val task = createTaskInfo(submission, offer)
+        queuedTasks += task
+        logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
+          submission.submissionId)
+        val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
+          None, new Date(), None, getDriverFrameworkID(submission))
+        launchedDrivers(submission.submissionId) = newState
+        launchedDriversState.persist(submission.submissionId, newState)
+        afterLaunchCallback(submission.submissionId)
+      }
+    }
+  }
+
+  override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
+    logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}")
+    val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
+    val currentTime = new Date()
+
+    val currentOffers = offers.asScala.map {
+      o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
+    }.toList
+
+    stateLock.synchronized {
+      // We first schedule all the supervised drivers that are ready to retry.
+      // This list will be empty if none of the drivers are marked as supervise.
+      val driversToRetry = pendingRetryDrivers.filter { d =>
+        d.retryState.get.nextRetry.before(currentTime)
+      }
+
+      scheduleTasks(
+        copyBuffer(driversToRetry),
+        removeFromPendingRetryDrivers,
+        currentOffers,
+        tasks)
+
+      // Then we walk through the queued drivers and try to schedule them.
+      scheduleTasks(
+        copyBuffer(queuedDrivers),
+        removeFromQueuedDrivers,
+        currentOffers,
+        tasks)
+    }
+    tasks.foreach { case (offerId, taskInfos) =>
+      driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
+    }
+
+    for (o <- currentOffers if !tasks.contains(o.offerId)) {
+      driver.declineOffer(o.offerId)
+    }
+  }
+
+  private def copyBuffer(
+      buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
+    val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
+    buffer.copyToBuffer(newBuffer)
+    newBuffer
+  }
+
+  def getSchedulerState(): MesosClusterSchedulerState = {
+    stateLock.synchronized {
+      new MesosClusterSchedulerState(
+        frameworkId,
+        masterInfo.map(m => s"http://${m.getIp}:${m.getPort}"),
+        copyBuffer(queuedDrivers),
+        launchedDrivers.values.map(_.copy()).toList,
+        finishedDrivers.map(_.copy()).toList,
+        copyBuffer(pendingRetryDrivers))
+    }
+  }
+
+  override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {}
+  override def disconnected(driver: SchedulerDriver): Unit = {}
+  override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = {
+    logInfo(s"Framework re-registered with master ${masterInfo.getId}")
+  }
+  override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
+  override def error(driver: SchedulerDriver, error: String): Unit = {
+    logError("Error received: " + error)
+    markErr()
+  }
+
+  /**
+   * Check if the task state is a recoverable state that we can relaunch the task.
+   * Task state like TASK_ERROR are not relaunchable state since it wasn't able
+   * to be validated by Mesos.
+   */
+  private def shouldRelaunch(state: MesosTaskState): Boolean = {
+    state == MesosTaskState.TASK_FAILED ||
+      state == MesosTaskState.TASK_KILLED ||
+      state == MesosTaskState.TASK_LOST
+  }
+
+  override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
+    val taskId = status.getTaskId.getValue
+    stateLock.synchronized {
+      if (launchedDrivers.contains(taskId)) {
+        if (status.getReason == Reason.REASON_RECONCILIATION &&
+          !pendingRecover.contains(taskId)) {
+          // Task has already received update and no longer requires reconciliation.
+          return
+        }
+        val state = launchedDrivers(taskId)
+        // Check if the driver is supervise enabled and can be relaunched.
+        if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
+          removeFromLaunchedDrivers(taskId)
+          state.finishDate = Some(new Date())
+          val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
+          val (retries, waitTimeSec) = retryState
+            .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
+            .getOrElse{ (1, 1) }
+          val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
+
+          val newDriverDescription = state.driverDescription.copy(
+            retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
+          pendingRetryDrivers += newDriverDescription
+          pendingRetryDriversState.persist(taskId, newDriverDescription)
+        } else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
+          removeFromLaunchedDrivers(taskId)
+          state.finishDate = Some(new Date())
+          if (finishedDrivers.size >= retainedDrivers) {
+            val toRemove = math.max(retainedDrivers / 10, 1)
+            finishedDrivers.trimStart(toRemove)
+          }
+          finishedDrivers += state
+        }
+        state.mesosTaskStatus = Option(status)
+      } else {
+        logError(s"Unable to find driver $taskId in status update")
+      }
+    }
+  }
+
+  override def frameworkMessage(
+      driver: SchedulerDriver,
+      executorId: ExecutorID,
+      slaveId: SlaveID,
+      message: Array[Byte]): Unit = {}
+
+  override def executorLost(
+      driver: SchedulerDriver,
+      executorId: ExecutorID,
+      slaveId: SlaveID,
+      status: Int): Unit = {}
+
+  private def removeFromQueuedDrivers(id: String): Boolean = {
+    val index = queuedDrivers.indexWhere(_.submissionId.equals(id))
+    if (index != -1) {
+      queuedDrivers.remove(index)
+      queuedDriversState.expunge(id)
+      true
+    } else {
+      false
+    }
+  }
+
+  private def removeFromLaunchedDrivers(id: String): Boolean = {
+    if (launchedDrivers.remove(id).isDefined) {
+      launchedDriversState.expunge(id)
+      true
+    } else {
+      false
+    }
+  }
+
+  private def removeFromPendingRetryDrivers(id: String): Boolean = {
+    val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id))
+    if (index != -1) {
+      pendingRetryDrivers.remove(index)
+      pendingRetryDriversState.expunge(id)
+      true
+    } else {
+      false
+    }
+  }
+
+  def getQueuedDriversSize: Int = queuedDrivers.size
+  def getLaunchedDriversSize: Int = launchedDrivers.size
+  def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
new file mode 100644
index 0000000..1fe9497
--- /dev/null
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterScheduler)
+  extends Source {
+  override def sourceName: String = "mesos_cluster"
+  override def metricRegistry: MetricRegistry = new MetricRegistry()
+
+  metricRegistry.register(MetricRegistry.name("waitingDrivers"), new Gauge[Int] {
+    override def getValue: Int = scheduler.getQueuedDriversSize
+  })
+
+  metricRegistry.register(MetricRegistry.name("launchedDrivers"), new Gauge[Int] {
+    override def getValue: Int = scheduler.getLaunchedDriversSize
+  })
+
+  metricRegistry.register(MetricRegistry.name("retryDrivers"), new Gauge[Int] {
+    override def getValue: Int = scheduler.getPendingRetryDriversSize
+  })
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
new file mode 100644
index 0000000..fde1fb3
--- /dev/null
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{Collections, List => JList}
+import java.util.concurrent.locks.ReentrantLock
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+
+import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+/**
+ * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
+ * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
+ * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
+ * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
+ * latency.
+ *
+ * Unfortunately this has a bit of duplication from [[MesosFineGrainedSchedulerBackend]],
+ * but it seems hard to remove this.
+ */
+private[spark] class MesosCoarseGrainedSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext,
+    master: String,
+    securityManager: SecurityManager)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
+  with org.apache.mesos.Scheduler
+  with MesosSchedulerUtils {
+
+  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
+
+  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
+  val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+
+  private[this] val shutdownTimeoutMS =
+    conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
+      .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
+
+  // Synchronization protected by stateLock
+  private[this] var stopCalled: Boolean = false
+
+  // If shuffle service is enabled, the Spark driver will register with the shuffle service.
+  // This is for cleaning up shuffle files reliably.
+  private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+
+  // Cores we have acquired with each Mesos task ID
+  val coresByTaskId = new mutable.HashMap[String, Int]
+  var totalCoresAcquired = 0
+
+  // SlaveID -> Slave
+  // This map accumulates entries for the duration of the job.  Slaves are never deleted, because
+  // we need to maintain e.g. failure state and connection state.
+  private val slaves = new mutable.HashMap[String, Slave]
+
+  /**
+   * The total number of executors we aim to have. Undefined when not using dynamic allocation.
+   * Initially set to 0 when using dynamic allocation, the executor allocation manager will send
+   * the real initial limit later.
+   */
+  private var executorLimitOption: Option[Int] = {
+    if (Utils.isDynamicAllocationEnabled(conf)) {
+      Some(0)
+    } else {
+      None
+    }
+  }
+
+  /**
+   *  Return the current executor limit, which may be [[Int.MaxValue]]
+   *  before properly initialized.
+   */
+  private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)
+
+  // private lock object protecting mutable state above. Using the intrinsic lock
+  // may lead to deadlocks since the superclass might also try to lock
+  private val stateLock = new ReentrantLock
+
+  val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
+
+  // Offer constraints
+  private val slaveOfferConstraints =
+    parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
+  // Reject offers with mismatched constraints in seconds
+  private val rejectOfferDurationForUnmetConstraints =
+    getRejectOfferDurationForUnmetConstraints(sc)
+
+  // Reject offers when we reached the maximum number of cores for this framework
+  private val rejectOfferDurationForReachedMaxCores =
+    getRejectOfferDurationForReachedMaxCores(sc)
+
+  // A client for talking to the external shuffle service
+  private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
+    if (shuffleServiceEnabled) {
+      Some(getShuffleClient())
+    } else {
+      None
+    }
+  }
+
+  // This method is factored out for testability
+  protected def getShuffleClient(): MesosExternalShuffleClient = {
+    new MesosExternalShuffleClient(
+      SparkTransportConf.fromSparkConf(conf, "shuffle"),
+      securityManager,
+      securityManager.isAuthenticationEnabled(),
+      securityManager.isSaslEncryptionEnabled())
+  }
+
+  var nextMesosTaskId = 0
+
+  @volatile var appId: String = _
+
+  def newMesosTaskId(): String = {
+    val id = nextMesosTaskId
+    nextMesosTaskId += 1
+    id.toString
+  }
+
+  override def start() {
+    super.start()
+    val driver = createSchedulerDriver(
+      master,
+      MesosCoarseGrainedSchedulerBackend.this,
+      sc.sparkUser,
+      sc.appName,
+      sc.conf,
+      sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
+      None,
+      None,
+      sc.conf.getOption("spark.mesos.driver.frameworkId")
+    )
+
+    unsetFrameworkID(sc)
+    startScheduler(driver)
+  }
+
+  def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
+    val environment = Environment.newBuilder()
+    val extraClassPath = conf.getOption("spark.executor.extraClassPath")
+    extraClassPath.foreach { cp =>
+      environment.addVariables(
+        Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
+    }
+    val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")
+
+    // Set the environment variable through a command prefix
+    // to append to the existing value of the variable
+    val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p =>
+      Utils.libraryPathEnvPrefix(Seq(p))
+    }.getOrElse("")
+
+    environment.addVariables(
+      Environment.Variable.newBuilder()
+        .setName("SPARK_EXECUTOR_OPTS")
+        .setValue(extraJavaOpts)
+        .build())
+
+    sc.executorEnvs.foreach { case (key, value) =>
+      environment.addVariables(Environment.Variable.newBuilder()
+        .setName(key)
+        .setValue(value)
+        .build())
+    }
+    val command = CommandInfo.newBuilder()
+      .setEnvironment(environment)
+
+    val uri = conf.getOption("spark.executor.uri")
+      .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+
+    if (uri.isEmpty) {
+      val executorSparkHome = conf.getOption("spark.mesos.executor.home")
+        .orElse(sc.getSparkHome())
+        .getOrElse {
+          throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+        }
+      val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
+      command.setValue(
+        "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
+          .format(prefixEnv, runScript) +
+        s" --driver-url $driverURL" +
+        s" --executor-id $taskId" +
+        s" --hostname ${offer.getHostname}" +
+        s" --cores $numCores" +
+        s" --app-id $appId")
+    } else {
+      // Grab everything to the first '.'. We'll use that and '*' to
+      // glob the directory "correctly".
+      val basename = uri.get.split('/').last.split('.').head
+      command.setValue(
+        s"cd $basename*; $prefixEnv " +
+        "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
+        s" --driver-url $driverURL" +
+        s" --executor-id $taskId" +
+        s" --hostname ${offer.getHostname}" +
+        s" --cores $numCores" +
+        s" --app-id $appId")
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
+    }
+
+    conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
+
+    command.build()
+  }
+
+  protected def driverURL: String = {
+    if (conf.contains("spark.testing")) {
+      "driverURL"
+    } else {
+      RpcEndpointAddress(
+        conf.get("spark.driver.host"),
+        conf.get("spark.driver.port").toInt,
+        CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+    }
+  }
+
+  override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
+
+  override def registered(
+      d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+    appId = frameworkId.getValue
+    mesosExternalShuffleClient.foreach(_.init(appId))
+    markRegistered()
+  }
+
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalCoresAcquired >= maxCores * minRegisteredRatio
+  }
+
+  override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
+
+  override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
+
+  /**
+   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
+   * unless we've already launched more than we wanted to.
+   */
+  override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
+    stateLock.synchronized {
+      if (stopCalled) {
+        logDebug("Ignoring offers during shutdown")
+        // Driver should simply return a stopped status on race
+        // condition between this.stop() and completing here
+        offers.asScala.map(_.getId).foreach(d.declineOffer)
+        return
+      }
+
+      logDebug(s"Received ${offers.size} resource offers.")
+
+      val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
+        val offerAttributes = toAttributeMap(offer.getAttributesList)
+        matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+      }
+
+      declineUnmatchedOffers(d, unmatchedOffers)
+      handleMatchedOffers(d, matchedOffers)
+    }
+  }
+
+  private def declineUnmatchedOffers(
+      d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
+    offers.foreach { offer =>
+      declineOffer(d, offer, Some("unmet constraints"),
+        Some(rejectOfferDurationForUnmetConstraints))
+    }
+  }
+
+  private def declineOffer(
+      d: org.apache.mesos.SchedulerDriver,
+      offer: Offer,
+      reason: Option[String] = None,
+      refuseSeconds: Option[Long] = None): Unit = {
+
+    val id = offer.getId.getValue
+    val offerAttributes = toAttributeMap(offer.getAttributesList)
+    val mem = getResource(offer.getResourcesList, "mem")
+    val cpus = getResource(offer.getResourcesList, "cpus")
+    val ports = getRangeResource(offer.getResourcesList, "ports")
+
+    logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
+      s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
+      reason.map(r => s" (reason: $r)").getOrElse(""))
+
+    refuseSeconds match {
+      case Some(seconds) =>
+        val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
+        d.declineOffer(offer.getId, filters)
+      case _ => d.declineOffer(offer.getId)
+    }
+  }
+
+  /**
+   * Launches executors on accepted offers, and declines unused offers. Executors are launched
+   * round-robin on offers.
+   *
+   * @param d SchedulerDriver
+   * @param offers Mesos offers that match attribute constraints
+   */
+  private def handleMatchedOffers(
+      d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
+    val tasks = buildMesosTasks(offers)
+    for (offer <- offers) {
+      val offerAttributes = toAttributeMap(offer.getAttributesList)
+      val offerMem = getResource(offer.getResourcesList, "mem")
+      val offerCpus = getResource(offer.getResourcesList, "cpus")
+      val offerPorts = getRangeResource(offer.getResourcesList, "ports")
+      val id = offer.getId.getValue
+
+      if (tasks.contains(offer.getId)) { // accept
+        val offerTasks = tasks(offer.getId)
+
+        logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
+          s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." +
+          s"  Launching ${offerTasks.size} Mesos tasks.")
+
+        for (task <- offerTasks) {
+          val taskId = task.getTaskId
+          val mem = getResource(task.getResourcesList, "mem")
+          val cpus = getResource(task.getResourcesList, "cpus")
+          val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")
+
+          logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
+            s" ports: $ports")
+        }
+
+        d.launchTasks(
+          Collections.singleton(offer.getId),
+          offerTasks.asJava)
+      } else if (totalCoresAcquired >= maxCores) {
+        // Reject an offer for a configurable amount of time to avoid starving other frameworks
+        declineOffer(d, offer, Some("reached spark.cores.max"),
+          Some(rejectOfferDurationForReachedMaxCores))
+      } else {
+        declineOffer(d, offer)
+      }
+    }
+  }
+
+  /**
+   * Returns a map from OfferIDs to the tasks to launch on those offers.  In order to maximize
+   * per-task memory and IO, tasks are round-robin assigned to offers.
+   *
+   * @param offers Mesos offers that match attribute constraints
+   * @return A map from OfferID to a list of Mesos tasks to launch on that offer
+   */
+  private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
+    // offerID -> tasks
+    val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
+
+    // offerID -> resources
+    val remainingResources = mutable.Map(offers.map(offer =>
+      (offer.getId.getValue, offer.getResourcesList)): _*)
+
+    var launchTasks = true
+
+    // TODO(mgummelt): combine offers for a single slave
+    //
+    // round-robin create executors on the available offers
+    while (launchTasks) {
+      launchTasks = false
+
+      for (offer <- offers) {
+        val slaveId = offer.getSlaveId.getValue
+        val offerId = offer.getId.getValue
+        val resources = remainingResources(offerId)
+
+        if (canLaunchTask(slaveId, resources)) {
+          // Create a task
+          launchTasks = true
+          val taskId = newMesosTaskId()
+          val offerCPUs = getResource(resources, "cpus").toInt
+
+          val taskCPUs = executorCores(offerCPUs)
+          val taskMemory = executorMemory(sc)
+
+          slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
+
+          val (resourcesLeft, resourcesToUse) =
+            partitionTaskResources(resources, taskCPUs, taskMemory)
+
+          val taskBuilder = MesosTaskInfo.newBuilder()
+            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+            .setSlaveId(offer.getSlaveId)
+            .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
+            .setName("Task " + taskId)
+
+          taskBuilder.addAllResources(resourcesToUse.asJava)
+
+          sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+            MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+              image,
+              sc.conf,
+              taskBuilder.getContainerBuilder
+            )
+          }
+
+          tasks(offer.getId) ::= taskBuilder.build()
+          remainingResources(offerId) = resourcesLeft.asJava
+          totalCoresAcquired += taskCPUs
+          coresByTaskId(taskId) = taskCPUs
+        }
+      }
+    }
+    tasks.toMap
+  }
+
+  /** Extracts task needed resources from a list of available resources. */
+  private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
+    : (List[Resource], List[Resource]) = {
+
+    // partition cpus & mem
+    val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
+    val (afterMemResources, memResourcesToUse) =
+      partitionResources(afterCPUResources.asJava, "mem", taskMemory)
+
+    // If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
+    // on the same host. This essentially means one executor per host.
+    // TODO: handle network isolator case
+    val (nonPortResources, portResourcesToUse) =
+      partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)
+
+    (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
+  }
+
+  private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
+    val offerMem = getResource(resources, "mem")
+    val offerCPUs = getResource(resources, "cpus").toInt
+    val cpus = executorCores(offerCPUs)
+    val mem = executorMemory(sc)
+    val ports = getRangeResource(resources, "ports")
+    val meetsPortRequirements = checkPorts(sc.conf, ports)
+
+    cpus > 0 &&
+      cpus <= offerCPUs &&
+      cpus + totalCoresAcquired <= maxCores &&
+      mem <= offerMem &&
+      numExecutors() < executorLimit &&
+      slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
+      meetsPortRequirements
+  }
+
+  private def executorCores(offerCPUs: Int): Int = {
+    sc.conf.getInt("spark.executor.cores",
+      math.min(offerCPUs, maxCores - totalCoresAcquired))
+  }
+
+  override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
+    val taskId = status.getTaskId.getValue
+    val slaveId = status.getSlaveId.getValue
+    val state = mesosToTaskState(status.getState)
+
+    logInfo(s"Mesos task $taskId is now ${status.getState}")
+
+    stateLock.synchronized {
+      val slave = slaves(slaveId)
+
+      // If the shuffle service is enabled, have the driver register with each one of the
+      // shuffle services. This allows the shuffle services to clean up state associated with
+      // this application when the driver exits. There is currently not a great way to detect
+      // this through Mesos, since the shuffle services are set up independently.
+      if (state.equals(TaskState.RUNNING) &&
+          shuffleServiceEnabled &&
+          !slave.shuffleRegistered) {
+        assume(mesosExternalShuffleClient.isDefined,
+          "External shuffle client was not instantiated even though shuffle service is enabled.")
+        // TODO: Remove this and allow the MesosExternalShuffleService to detect
+        // framework termination when new Mesos Framework HTTP API is available.
+        val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
+
+        logDebug(s"Connecting to shuffle service on slave $slaveId, " +
+            s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
+
+        mesosExternalShuffleClient.get
+          .registerDriverWithShuffleService(
+            slave.hostname,
+            externalShufflePort,
+            sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
+              s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"),
+            sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
+        slave.shuffleRegistered = true
+      }
+
+      if (TaskState.isFinished(state)) {
+        // Remove the cores we have remembered for this task, if it's in the hashmap
+        for (cores <- coresByTaskId.get(taskId)) {
+          totalCoresAcquired -= cores
+          coresByTaskId -= taskId
+        }
+        // If it was a failure, mark the slave as failed for blacklisting purposes
+        if (TaskState.isFailed(state)) {
+          slave.taskFailures += 1
+
+          if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
+            logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
+                "is Spark installed on it?")
+          }
+        }
+        executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
+        // In case we'd rejected everything before but have now lost a node
+        d.reviveOffers()
+      }
+    }
+  }
+
+  override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
+    logError(s"Mesos error: $message")
+    scheduler.error(message)
+  }
+
+  override def stop() {
+    // Make sure we're not launching tasks during shutdown
+    stateLock.synchronized {
+      if (stopCalled) {
+        logWarning("Stop called multiple times, ignoring")
+        return
+      }
+      stopCalled = true
+      super.stop()
+    }
+
+    // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them.
+    // See SPARK-12330
+    val startTime = System.nanoTime()
+
+    // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent
+    while (numExecutors() > 0 &&
+      System.nanoTime() - startTime < shutdownTimeoutMS * 1000L * 1000L) {
+      Thread.sleep(100)
+    }
+
+    if (numExecutors() > 0) {
+      logWarning(s"Timed out waiting for ${numExecutors()} remaining executors "
+        + s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files "
+        + "on the mesos nodes.")
+    }
+
+    // Close the mesos external shuffle client if used
+    mesosExternalShuffleClient.foreach(_.close())
+
+    if (mesosDriver != null) {
+      mesosDriver.stop()
+    }
+  }
+
+  override def frameworkMessage(
+      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+  /**
+   * Called when a slave is lost or a Mesos task finished. Updates local view on
+   * what tasks are running. It also notifies the driver that an executor was removed.
+   */
+  private def executorTerminated(
+      d: org.apache.mesos.SchedulerDriver,
+      slaveId: String,
+      taskId: String,
+      reason: String): Unit = {
+    stateLock.synchronized {
+      // Do not call removeExecutor() after this scheduler backend was stopped because
+      // removeExecutor() internally will send a message to the driver endpoint but
+      // the driver endpoint is not available now, otherwise an exception will be thrown.
+      if (!stopCalled) {
+        removeExecutor(taskId, SlaveLost(reason))
+      }
+      slaves(slaveId).taskIDs.remove(taskId)
+    }
+  }
+
+  override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit = {
+    logInfo(s"Mesos slave lost: ${slaveId.getValue}")
+  }
+
+  override def executorLost(
+      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
+    logInfo("Mesos executor lost: %s".format(e.getValue))
+  }
+
+  override def applicationId(): String =
+    Option(appId).getOrElse {
+      logWarning("Application ID is not initialized yet.")
+      super.applicationId
+    }
+
+  override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+    // We don't truly know if we can fulfill the full amount of executors
+    // since at coarse grain it depends on the amount of slaves available.
+    logInfo("Capping the total amount of executors to " + requestedTotal)
+    executorLimitOption = Some(requestedTotal)
+    true
+  }
+
+  override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+    if (mesosDriver == null) {
+      logWarning("Asked to kill executors before the Mesos driver was started.")
+      false
+    } else {
+      for (executorId <- executorIds) {
+        val taskId = TaskID.newBuilder().setValue(executorId).build()
+        mesosDriver.killTask(taskId)
+      }
+      // no need to adjust `executorLimitOption` since the AllocationManager already communicated
+      // the desired limit through a call to `doRequestTotalExecutors`.
+      // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
+      true
+    }
+  }
+
+  private def numExecutors(): Int = {
+    slaves.values.map(_.taskIDs.size).sum
+  }
+}
+
+private class Slave(val hostname: String) {
+  val taskIDs = new mutable.HashSet[String]()
+  var taskFailures = 0
+  var shuffleRegistered = false
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
new file mode 100644
index 0000000..eb3b235
--- /dev/null
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{ArrayList => JArrayList, Collections, List => JList}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, HashSet}
+
+import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.protobuf.ByteString
+
+import org.apache.spark.{SparkContext, SparkException, TaskState}
+import org.apache.spark.executor.MesosExecutorBackend
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.util.Utils
+
+/**
+ * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
+ * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
+ * from multiple apps can run on different cores) and in time (a core can switch ownership).
+ */
+private[spark] class MesosFineGrainedSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext,
+    master: String)
+  extends SchedulerBackend
+  with org.apache.mesos.Scheduler
+  with MesosSchedulerUtils {
+
+  // Stores the slave ids that has launched a Mesos executor.
+  val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
+  val taskIdToSlaveId = new HashMap[Long, String]
+
+  // An ExecutorInfo for our tasks
+  var execArgs: Array[Byte] = null
+
+  var classLoader: ClassLoader = null
+
+  // The listener bus to publish executor added/removed events.
+  val listenerBus = sc.listenerBus
+
+  private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
+
+  // Offer constraints
+  private[this] val slaveOfferConstraints =
+    parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
+  // reject offers with mismatched constraints in seconds
+  private val rejectOfferDurationForUnmetConstraints =
+    getRejectOfferDurationForUnmetConstraints(sc)
+
+  @volatile var appId: String = _
+
+  override def start() {
+    classLoader = Thread.currentThread.getContextClassLoader
+    val driver = createSchedulerDriver(
+      master,
+      MesosFineGrainedSchedulerBackend.this,
+      sc.sparkUser,
+      sc.appName,
+      sc.conf,
+      sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
+      Option.empty,
+      Option.empty,
+      sc.conf.getOption("spark.mesos.driver.frameworkId")
+    )
+
+    unsetFrameworkID(sc)
+    startScheduler(driver)
+  }
+
+  /**
+   * Creates a MesosExecutorInfo that is used to launch a Mesos executor.
+   * @param availableResources Available resources that is offered by Mesos
+   * @param execId The executor id to assign to this new executor.
+   * @return A tuple of the new mesos executor info and the remaining available resources.
+   */
+  def createExecutorInfo(
+      availableResources: JList[Resource],
+      execId: String): (MesosExecutorInfo, JList[Resource]) = {
+    val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
+      .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
+      .getOrElse {
+      throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+    }
+    val environment = Environment.newBuilder()
+    sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
+      environment.addVariables(
+        Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
+    }
+    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")
+
+    val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p =>
+      Utils.libraryPathEnvPrefix(Seq(p))
+    }.getOrElse("")
+
+    environment.addVariables(
+      Environment.Variable.newBuilder()
+        .setName("SPARK_EXECUTOR_OPTS")
+        .setValue(extraJavaOpts)
+        .build())
+    sc.executorEnvs.foreach { case (key, value) =>
+      environment.addVariables(Environment.Variable.newBuilder()
+        .setName(key)
+        .setValue(value)
+        .build())
+    }
+    val command = CommandInfo.newBuilder()
+      .setEnvironment(environment)
+    val uri = sc.conf.getOption("spark.executor.uri")
+      .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+
+    val executorBackendName = classOf[MesosExecutorBackend].getName
+    if (uri.isEmpty) {
+      val executorPath = new File(executorSparkHome, "/bin/spark-class").getPath
+      command.setValue(s"$prefixEnv $executorPath $executorBackendName")
+    } else {
+      // Grab everything to the first '.'. We'll use that and '*' to
+      // glob the directory "correctly".
+      val basename = uri.get.split('/').last.split('.').head
+      command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
+    }
+    val builder = MesosExecutorInfo.newBuilder()
+    val (resourcesAfterCpu, usedCpuResources) =
+      partitionResources(availableResources, "cpus", mesosExecutorCores)
+    val (resourcesAfterMem, usedMemResources) =
+      partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc))
+
+    builder.addAllResources(usedCpuResources.asJava)
+    builder.addAllResources(usedMemResources.asJava)
+
+    sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
+
+    val executorInfo = builder
+      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
+      .setCommand(command)
+      .setData(ByteString.copyFrom(createExecArg()))
+
+    sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+        image,
+        sc.conf,
+        executorInfo.getContainerBuilder()
+      )
+    }
+
+    (executorInfo.build(), resourcesAfterMem.asJava)
+  }
+
+  /**
+   * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
+   * containing all the spark.* system properties in the form of (String, String) pairs.
+   */
+  private def createExecArg(): Array[Byte] = {
+    if (execArgs == null) {
+      val props = new HashMap[String, String]
+      for ((key, value) <- sc.conf.getAll) {
+        props(key) = value
+      }
+      // Serialize the map as an array of (String, String) pairs
+      execArgs = Utils.serialize(props.toArray)
+    }
+    execArgs
+  }
+
+  override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
+
+  override def registered(
+      d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+    inClassLoader() {
+      appId = frameworkId.getValue
+      logInfo("Registered as framework ID " + appId)
+      markRegistered()
+    }
+  }
+
+  private def inClassLoader()(fun: => Unit) = {
+    val oldClassLoader = Thread.currentThread.getContextClassLoader
+    Thread.currentThread.setContextClassLoader(classLoader)
+    try {
+      fun
+    } finally {
+      Thread.currentThread.setContextClassLoader(oldClassLoader)
+    }
+  }
+
+  override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
+
+  override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
+
+  private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
+    val builder = new StringBuilder
+    tasks.asScala.foreach { t =>
+      builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
+        .append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
+        .append("Task resources: ").append(t.getResourcesList).append("\n")
+        .append("Executor resources: ").append(t.getExecutor.getResourcesList)
+        .append("---------------------------------------------\n")
+    }
+    builder.toString()
+  }
+
+  /**
+   * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
+   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
+   * tasks are balanced across the cluster.
+   */
+  override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
+    inClassLoader() {
+      // Fail first on offers with unmet constraints
+      val (offersMatchingConstraints, offersNotMatchingConstraints) =
+        offers.asScala.partition { o =>
+          val offerAttributes = toAttributeMap(o.getAttributesList)
+          val meetsConstraints =
+            matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+
+          // add some debug messaging
+          if (!meetsConstraints) {
+            val id = o.getId.getValue
+            logDebug(s"Declining offer: $id with attributes: $offerAttributes")
+          }
+
+          meetsConstraints
+        }
+
+      // These offers do not meet constraints. We don't need to see them again.
+      // Decline the offer for a long period of time.
+      offersNotMatchingConstraints.foreach { o =>
+        d.declineOffer(o.getId, Filters.newBuilder()
+          .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
+      }
+
+      // Of the matching constraints, see which ones give us enough memory and cores
+      val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
+        val mem = getResource(o.getResourcesList, "mem")
+        val cpus = getResource(o.getResourcesList, "cpus")
+        val slaveId = o.getSlaveId.getValue
+        val offerAttributes = toAttributeMap(o.getAttributesList)
+
+        // check offers for
+        //  1. Memory requirements
+        //  2. CPU requirements - need at least 1 for executor, 1 for task
+        val meetsMemoryRequirements = mem >= executorMemory(sc)
+        val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
+        val meetsRequirements =
+          (meetsMemoryRequirements && meetsCPURequirements) ||
+          (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
+        val debugstr = if (meetsRequirements) "Accepting" else "Declining"
+        logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+          + s"$offerAttributes mem: $mem cpu: $cpus")
+
+        meetsRequirements
+      }
+
+      // Decline offers we ruled out immediately
+      unUsableOffers.foreach(o => d.declineOffer(o.getId))
+
+      val workerOffers = usableOffers.map { o =>
+        val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
+          getResource(o.getResourcesList, "cpus").toInt
+        } else {
+          // If the Mesos executor has not been started on this slave yet, set aside a few
+          // cores for the Mesos executor by offering fewer cores to the Spark executor
+          (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt
+        }
+        new WorkerOffer(
+          o.getSlaveId.getValue,
+          o.getHostname,
+          cpus)
+      }
+
+      val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
+      val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
+      val slaveIdToResources = new HashMap[String, JList[Resource]]()
+      usableOffers.foreach { o =>
+        slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
+      }
+
+      val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
+
+      val slavesIdsOfAcceptedOffers = HashSet[String]()
+
+      // Call into the TaskSchedulerImpl
+      val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
+      acceptedOffers
+        .foreach { offer =>
+          offer.foreach { taskDesc =>
+            val slaveId = taskDesc.executorId
+            slavesIdsOfAcceptedOffers += slaveId
+            taskIdToSlaveId(taskDesc.taskId) = slaveId
+            val (mesosTask, remainingResources) = createMesosTask(
+              taskDesc,
+              slaveIdToResources(slaveId),
+              slaveId)
+            mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+              .add(mesosTask)
+            slaveIdToResources(slaveId) = remainingResources
+          }
+        }
+
+      // Reply to the offers
+      val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
+
+      mesosTasks.foreach { case (slaveId, tasks) =>
+        slaveIdToWorkerOffer.get(slaveId).foreach(o =>
+          listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
+            // TODO: Add support for log urls for Mesos
+            new ExecutorInfo(o.host, o.cores, Map.empty)))
+        )
+        logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
+        d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
+      }
+
+      // Decline offers that weren't used
+      // NOTE: This logic assumes that we only get a single offer for each host in a given batch
+      for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
+        d.declineOffer(o.getId)
+      }
+    }
+  }
+
+  /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */
+  def createMesosTask(
+      task: TaskDescription,
+      resources: JList[Resource],
+      slaveId: String): (MesosTaskInfo, JList[Resource]) = {
+    val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
+    val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) {
+      (slaveIdToExecutorInfo(slaveId), resources)
+    } else {
+      createExecutorInfo(resources, slaveId)
+    }
+    slaveIdToExecutorInfo(slaveId) = executorInfo
+    val (finalResources, cpuResources) =
+      partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
+    val taskInfo = MesosTaskInfo.newBuilder()
+      .setTaskId(taskId)
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+      .setExecutor(executorInfo)
+      .setName(task.name)
+      .addAllResources(cpuResources.asJava)
+      .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
+      .build()
+    (taskInfo, finalResources.asJava)
+  }
+
+  override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
+    inClassLoader() {
+      val tid = status.getTaskId.getValue.toLong
+      val state = mesosToTaskState(status.getState)
+      synchronized {
+        if (TaskState.isFailed(mesosToTaskState(status.getState))
+          && taskIdToSlaveId.contains(tid)) {
+          // We lost the executor on this slave, so remember that it's gone
+          removeExecutor(taskIdToSlaveId(tid), "Lost executor")
+        }
+        if (TaskState.isFinished(state)) {
+          taskIdToSlaveId.remove(tid)
+        }
+      }
+      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
+    }
+  }
+
+  override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
+    inClassLoader() {
+      logError("Mesos error: " + message)
+      markErr()
+      scheduler.error(message)
+    }
+  }
+
+  override def stop() {
+    if (mesosDriver != null) {
+      mesosDriver.stop()
+    }
+  }
+
+  override def reviveOffers() {
+    mesosDriver.reviveOffers()
+  }
+
+  override def frameworkMessage(
+      d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+  /**
+   * Remove executor associated with slaveId in a thread safe manner.
+   */
+  private def removeExecutor(slaveId: String, reason: String) = {
+    synchronized {
+      listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
+      slaveIdToExecutorInfo -= slaveId
+    }
+  }
+
+  private def recordSlaveLost(
+      d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
+    inClassLoader() {
+      logInfo("Mesos slave lost: " + slaveId.getValue)
+      removeExecutor(slaveId.getValue, reason.toString)
+      scheduler.executorLost(slaveId.getValue, reason)
+    }
+  }
+
+  override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID) {
+    recordSlaveLost(d, slaveId, SlaveLost())
+  }
+
+  override def executorLost(
+      d: org.apache.mesos.SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {
+    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
+                                                                 slaveId.getValue))
+    recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true))
+  }
+
+  override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
+    mesosDriver.killTask(
+      TaskID.newBuilder()
+        .setValue(taskId.toString).build()
+    )
+  }
+
+  // TODO: query Mesos for number of cores
+  override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8)
+
+  override def applicationId(): String =
+    Option(appId).getOrElse {
+      logWarning("Application ID is not initialized yet.")
+      super.applicationId
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
new file mode 100644
index 0000000..3fe0674
--- /dev/null
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
+import org.apache.mesos.Protos.ContainerInfo.DockerInfo
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
+
+/**
+ * A collection of utility functions which can be used by both the
+ * MesosSchedulerBackend and the [[MesosFineGrainedSchedulerBackend]].
+ */
+private[mesos] object MesosSchedulerBackendUtil extends Logging {
+  /**
+   * Parse a comma-delimited list of volume specs, each of which
+   * takes the form [host-dir:]container-dir[:rw|:ro].
+   */
+  def parseVolumesSpec(volumes: String): List[Volume] = {
+    volumes.split(",").map(_.split(":")).flatMap { spec =>
+        val vol: Volume.Builder = Volume
+          .newBuilder()
+          .setMode(Volume.Mode.RW)
+        spec match {
+          case Array(container_path) =>
+            Some(vol.setContainerPath(container_path))
+          case Array(container_path, "rw") =>
+            Some(vol.setContainerPath(container_path))
+          case Array(container_path, "ro") =>
+            Some(vol.setContainerPath(container_path)
+              .setMode(Volume.Mode.RO))
+          case Array(host_path, container_path) =>
+            Some(vol.setContainerPath(container_path)
+              .setHostPath(host_path))
+          case Array(host_path, container_path, "rw") =>
+            Some(vol.setContainerPath(container_path)
+              .setHostPath(host_path))
+          case Array(host_path, container_path, "ro") =>
+            Some(vol.setContainerPath(container_path)
+              .setHostPath(host_path)
+              .setMode(Volume.Mode.RO))
+          case spec =>
+            logWarning(s"Unable to parse volume specs: $volumes. "
+              + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
+            None
+      }
+    }
+    .map { _.build() }
+    .toList
+  }
+
+  /**
+   * Parse a comma-delimited list of port mapping specs, each of which
+   * takes the form host_port:container_port[:udp|:tcp]
+   *
+   * Note:
+   * the docker form is [ip:]host_port:container_port, but the DockerInfo
+   * message has no field for 'ip', and instead has a 'protocol' field.
+   * Docker itself only appears to support TCP, so this alternative form
+   * anticipates the expansion of the docker form to allow for a protocol
+   * and leaves open the chance for mesos to begin to accept an 'ip' field
+   */
+  def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = {
+    portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] =>
+      val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping
+        .newBuilder()
+        .setProtocol("tcp")
+      spec match {
+        case Array(host_port, container_port) =>
+          Some(portmap.setHostPort(host_port.toInt)
+            .setContainerPort(container_port.toInt))
+        case Array(host_port, container_port, protocol) =>
+          Some(portmap.setHostPort(host_port.toInt)
+            .setContainerPort(container_port.toInt)
+            .setProtocol(protocol))
+        case spec =>
+          logWarning(s"Unable to parse port mapping specs: $portmaps. "
+            + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"")
+          None
+      }
+    }
+    .map { _.build() }
+    .toList
+  }
+
+  /**
+   * Construct a DockerInfo structure and insert it into a ContainerInfo
+   */
+  def addDockerInfo(
+      container: ContainerInfo.Builder,
+      image: String,
+      containerizer: String,
+      forcePullImage: Boolean = false,
+      volumes: Option[List[Volume]] = None,
+      portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
+
+    containerizer match {
+      case "docker" =>
+        container.setType(ContainerInfo.Type.DOCKER)
+        val docker = ContainerInfo.DockerInfo.newBuilder()
+          .setImage(image)
+          .setForcePullImage(forcePullImage)
+        // TODO (mgummelt): Remove this. Portmaps have no effect,
+        //                  as we don't support bridge networking.
+        portmaps.foreach(_.foreach(docker.addPortMappings))
+        container.setDocker(docker)
+      case "mesos" =>
+        container.setType(ContainerInfo.Type.MESOS)
+        val imageProto = Image.newBuilder()
+          .setType(Image.Type.DOCKER)
+          .setDocker(Image.Docker.newBuilder().setName(image))
+          .setCached(!forcePullImage)
+        container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
+      case _ =>
+        throw new SparkException(
+          "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
+    }
+
+    volumes.foreach(_.foreach(container.addVolumes))
+  }
+
+  /**
+   * Setup a docker containerizer from MesosDriverDescription scheduler properties
+   */
+  def setupContainerBuilderDockerInfo(
+    imageName: String,
+    conf: SparkConf,
+    builder: ContainerInfo.Builder): Unit = {
+    val forcePullImage = conf
+      .getOption("spark.mesos.executor.docker.forcePullImage")
+      .exists(_.equals("true"))
+    val volumes = conf
+      .getOption("spark.mesos.executor.docker.volumes")
+      .map(parseVolumesSpec)
+    val portmaps = conf
+      .getOption("spark.mesos.executor.docker.portmaps")
+      .map(parsePortMappingsSpec)
+
+    val containerizer = conf.get("spark.mesos.containerizer", "docker")
+    addDockerInfo(
+      builder,
+      imageName,
+      containerizer,
+      forcePullImage = forcePullImage,
+      volumes = volumes,
+      portmaps = portmaps)
+    logDebug("setupContainerDockerInfo: using docker image: " + imageName)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org