You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/02/10 19:53:37 UTC

spark git commit: [SPARK-5095][MESOS] Support launching multiple mesos executors in coarse grained mesos mode.

Repository: spark
Updated Branches:
  refs/heads/master c0b71e0b8 -> 80cb963ad


[SPARK-5095][MESOS] Support launching multiple mesos executors in coarse grained mesos mode.

This is the next iteration of tnachen's previous PR: https://github.com/apache/spark/pull/4027

In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone.  This PR implements that resolution.

This PR implements two high-level features.  These two features are co-dependent, so they're implemented both here:
- Mesos support for spark.executor.cores
- Multiple executors per slave

We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR.

The contribution is my original work and I license the work to the project under the project's open source license.

Author: Michael Gummelt <mg...@mesosphere.io>

Closes #10993 from mgummelt/executor_sizing.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80cb963a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80cb963a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80cb963a

Branch: refs/heads/master
Commit: 80cb963ad963e26c3a7f8388bdd4ffd5e99aad1a
Parents: c0b71e0
Author: Michael Gummelt <mg...@mesosphere.io>
Authored: Wed Feb 10 10:53:33 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Feb 10 10:53:33 2016 -0800

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala |  11 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     | 375 ++++++++++++-------
 .../cluster/mesos/MesosSchedulerBackend.scala   |   4 +-
 .../cluster/mesos/MesosSchedulerUtils.scala     |  10 +-
 .../CoarseMesosSchedulerBackendSuite.scala      | 365 ++++++++++++------
 .../mesos/MesosSchedulerBackendSuite.scala      |   2 +-
 .../mesos/MesosSchedulerUtilsSuite.scala        |   6 +-
 docs/configuration.md                           |  15 +-
 docs/running-on-mesos.md                        |   8 +-
 9 files changed, 521 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/80cb963a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f69a3d3..0a5b09d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -240,6 +240,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         else {
           val executorData = executorDataMap(task.executorId)
           executorData.freeCores -= scheduler.CPUS_PER_TASK
+
+          logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
+            s"${executorData.executorHost}.")
+
           executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
         }
       }
@@ -309,7 +313,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     // TODO (prashant) send conf instead of properties
-    driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
+    driverEndpoint = createDriverEndpointRef(properties)
+  }
+
+  protected def createDriverEndpointRef(
+      properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
+    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
   }
 
   protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {

http://git-wip-us.apache.org/repos/asf/spark/blob/80cb963a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 0a2d72f..98699e0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -23,17 +23,17 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable
+import scala.collection.mutable.{Buffer, HashMap, HashSet}
 
 import com.google.common.base.Stopwatch
-import com.google.common.collect.HashBiMap
 import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 
-import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}
+import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress}
+import org.apache.spark.rpc.{RpcEndpointAddress}
 import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
@@ -74,17 +74,13 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
 
   // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new HashMap[Int, Int]
+  val coresByTaskId = new HashMap[String, Int]
   var totalCoresAcquired = 0
 
-  val slaveIdsWithExecutors = new HashSet[String]
-
-  // Maping from slave Id to hostname
-  private val slaveIdToHost = new HashMap[String, String]
-
-  val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
-  // How many times tasks on each slave failed
-  val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
+  // SlaveID -> Slave
+  // This map accumulates entries for the duration of the job.  Slaves are never deleted, because
+  // we need to maintain e.g. failure state and connection state.
+  private val slaves = new HashMap[String, Slave]
 
   /**
    * The total number of executors we aim to have. Undefined when not using dynamic allocation.
@@ -105,13 +101,11 @@ private[spark] class CoarseMesosSchedulerBackend(
    */
   private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)
 
-  private val pendingRemovedSlaveIds = new HashSet[String]
-
   // private lock object protecting mutable state above. Using the intrinsic lock
   // may lead to deadlocks since the superclass might also try to lock
   private val stateLock = new ReentrantLock
 
-  val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
+  val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
 
   // Offer constraints
   private val slaveOfferConstraints =
@@ -121,27 +115,31 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val rejectOfferDurationForUnmetConstraints =
     getRejectOfferDurationForUnmetConstraints(sc)
 
-  // A client for talking to the external shuffle service, if it is a
+  // A client for talking to the external shuffle service
   private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
     if (shuffleServiceEnabled) {
-      Some(new MesosExternalShuffleClient(
-        SparkTransportConf.fromSparkConf(conf, "shuffle"),
-        securityManager,
-        securityManager.isAuthenticationEnabled(),
-        securityManager.isSaslEncryptionEnabled()))
+      Some(getShuffleClient())
     } else {
       None
     }
   }
 
+  protected def getShuffleClient(): MesosExternalShuffleClient = {
+    new MesosExternalShuffleClient(
+      SparkTransportConf.fromSparkConf(conf, "shuffle"),
+      securityManager,
+      securityManager.isAuthenticationEnabled(),
+      securityManager.isSaslEncryptionEnabled())
+  }
+
   var nextMesosTaskId = 0
 
   @volatile var appId: String = _
 
-  def newMesosTaskId(): Int = {
+  def newMesosTaskId(): String = {
     val id = nextMesosTaskId
     nextMesosTaskId += 1
-    id
+    id.toString
   }
 
   override def start() {
@@ -156,7 +154,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     startScheduler(driver)
   }
 
-  def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
+  def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
     val executorSparkHome = conf.getOption("spark.mesos.executor.home")
       .orElse(sc.getSparkHome())
       .getOrElse {
@@ -200,7 +198,7 @@ private[spark] class CoarseMesosSchedulerBackend(
         "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
           .format(prefixEnv, runScript) +
         s" --driver-url $driverURL" +
-        s" --executor-id ${offer.getSlaveId.getValue}" +
+        s" --executor-id $taskId" +
         s" --hostname ${offer.getHostname}" +
         s" --cores $numCores" +
         s" --app-id $appId")
@@ -208,12 +206,11 @@ private[spark] class CoarseMesosSchedulerBackend(
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.get.split('/').last.split('.').head
-      val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString)
       command.setValue(
         s"cd $basename*; $prefixEnv " +
-         "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
+        "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
         s" --driver-url $driverURL" +
-        s" --executor-id $executorId" +
+        s" --executor-id $taskId" +
         s" --hostname ${offer.getHostname}" +
         s" --cores $numCores" +
         s" --app-id $appId")
@@ -268,113 +265,209 @@ private[spark] class CoarseMesosSchedulerBackend(
         offers.asScala.map(_.getId).foreach(d.declineOffer)
         return
       }
-      val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-      for (offer <- offers.asScala) {
+
+      logDebug(s"Received ${offers.size} resource offers.")
+
+      val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer =>
         val offerAttributes = toAttributeMap(offer.getAttributesList)
-        val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+        matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+      }
+
+      declineUnmatchedOffers(d, unmatchedOffers)
+      handleMatchedOffers(d, matchedOffers)
+    }
+  }
+
+  private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
+    for (offer <- offers) {
+      val id = offer.getId.getValue
+      val offerAttributes = toAttributeMap(offer.getAttributesList)
+      val mem = getResource(offer.getResourcesList, "mem")
+      val cpus = getResource(offer.getResourcesList, "cpus")
+      val filters = Filters.newBuilder()
+        .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
+
+      logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+        + s" for $rejectOfferDurationForUnmetConstraints seconds")
+
+      d.declineOffer(offer.getId, filters)
+    }
+  }
+
+  /**
+    * Launches executors on accepted offers, and declines unused offers. Executors are launched
+    * round-robin on offers.
+    *
+    * @param d SchedulerDriver
+    * @param offers Mesos offers that match attribute constraints
+    */
+  private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
+    val tasks = buildMesosTasks(offers)
+    for (offer <- offers) {
+      val offerAttributes = toAttributeMap(offer.getAttributesList)
+      val offerMem = getResource(offer.getResourcesList, "mem")
+      val offerCpus = getResource(offer.getResourcesList, "cpus")
+      val id = offer.getId.getValue
+
+      if (tasks.contains(offer.getId)) { // accept
+        val offerTasks = tasks(offer.getId)
+
+        logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
+          s"mem: $offerMem cpu: $offerCpus.  Launching ${offerTasks.size} Mesos tasks.")
+
+        for (task <- offerTasks) {
+          val taskId = task.getTaskId
+          val mem = getResource(task.getResourcesList, "mem")
+          val cpus = getResource(task.getResourcesList, "cpus")
+
+          logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.")
+        }
+
+        d.launchTasks(
+          Collections.singleton(offer.getId),
+          offerTasks.asJava)
+      } else { // decline
+        logDebug(s"Declining offer: $id with attributes: $offerAttributes " +
+          s"mem: $offerMem cpu: $offerCpus")
+
+        d.declineOffer(offer.getId)
+      }
+    }
+  }
+
+  /**
+    * Returns a map from OfferIDs to the tasks to launch on those offers.  In order to maximize
+    * per-task memory and IO, tasks are round-robin assigned to offers.
+    *
+    * @param offers Mesos offers that match attribute constraints
+    * @return A map from OfferID to a list of Mesos tasks to launch on that offer
+    */
+  private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
+    // offerID -> tasks
+    val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
+
+    // offerID -> resources
+    val remainingResources = mutable.Map(offers.map(offer =>
+      (offer.getId.getValue, offer.getResourcesList)): _*)
+
+    var launchTasks = true
+
+    // TODO(mgummelt): combine offers for a single slave
+    //
+    // round-robin create executors on the available offers
+    while (launchTasks) {
+      launchTasks = false
+
+      for (offer <- offers) {
         val slaveId = offer.getSlaveId.getValue
-        val mem = getResource(offer.getResourcesList, "mem")
-        val cpus = getResource(offer.getResourcesList, "cpus").toInt
-        val id = offer.getId.getValue
-        if (meetsConstraints) {
-          if (taskIdToSlaveId.size < executorLimit &&
-              totalCoresAcquired < maxCores &&
-              mem >= calculateTotalMemory(sc) &&
-              cpus >= 1 &&
-              failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
-              !slaveIdsWithExecutors.contains(slaveId)) {
-            // Launch an executor on the slave
-            val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-            totalCoresAcquired += cpusToUse
-            val taskId = newMesosTaskId()
-            taskIdToSlaveId.put(taskId, slaveId)
-            slaveIdsWithExecutors += slaveId
-            coresByTaskId(taskId) = cpusToUse
-            // Gather cpu resources from the available resources and use them in the task.
-            val (remainingResources, cpuResourcesToUse) =
-              partitionResources(offer.getResourcesList, "cpus", cpusToUse)
-            val (_, memResourcesToUse) =
-              partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
-            val taskBuilder = MesosTaskInfo.newBuilder()
-              .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
-              .setSlaveId(offer.getSlaveId)
-              .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
-              .setName("Task " + taskId)
-              .addAllResources(cpuResourcesToUse.asJava)
-              .addAllResources(memResourcesToUse.asJava)
-
-            sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-              MesosSchedulerBackendUtil
-                .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
-            }
-
-            // Accept the offer and launch the task
-            logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
-            slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
-            d.launchTasks(
-              Collections.singleton(offer.getId),
-              Collections.singleton(taskBuilder.build()), filters)
-          } else {
-            // Decline the offer
-            logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
-            d.declineOffer(offer.getId)
+        val offerId = offer.getId.getValue
+        val resources = remainingResources(offerId)
+
+        if (canLaunchTask(slaveId, resources)) {
+          // Create a task
+          launchTasks = true
+          val taskId = newMesosTaskId()
+          val offerCPUs = getResource(resources, "cpus").toInt
+
+          val taskCPUs = executorCores(offerCPUs)
+          val taskMemory = executorMemory(sc)
+
+          slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
+
+          val (afterCPUResources, cpuResourcesToUse) =
+            partitionResources(resources, "cpus", taskCPUs)
+          val (resourcesLeft, memResourcesToUse) =
+            partitionResources(afterCPUResources.asJava, "mem", taskMemory)
+
+          val taskBuilder = MesosTaskInfo.newBuilder()
+            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+            .setSlaveId(offer.getSlaveId)
+            .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
+            .setName("Task " + taskId)
+            .addAllResources(cpuResourcesToUse.asJava)
+            .addAllResources(memResourcesToUse.asJava)
+
+          sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+            MesosSchedulerBackendUtil
+              .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
           }
-        } else {
-          // This offer does not meet constraints. We don't need to see it again.
-          // Decline the offer for a long period of time.
-          logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
-              + s" for $rejectOfferDurationForUnmetConstraints seconds")
-          d.declineOffer(offer.getId, Filters.newBuilder()
-            .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
+
+          tasks(offer.getId) ::= taskBuilder.build()
+          remainingResources(offerId) = resourcesLeft.asJava
+          totalCoresAcquired += taskCPUs
+          coresByTaskId(taskId) = taskCPUs
         }
       }
     }
+    tasks.toMap
+  }
+
+  private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
+    val offerMem = getResource(resources, "mem")
+    val offerCPUs = getResource(resources, "cpus").toInt
+    val cpus = executorCores(offerCPUs)
+    val mem = executorMemory(sc)
+
+    cpus > 0 &&
+      cpus <= offerCPUs &&
+      cpus + totalCoresAcquired <= maxCores &&
+      mem <= offerMem &&
+      numExecutors() < executorLimit &&
+      slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES
   }
 
+  private def executorCores(offerCPUs: Int): Int = {
+    sc.conf.getInt("spark.executor.cores",
+      math.min(offerCPUs, maxCores - totalCoresAcquired))
+  }
 
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
-    val taskId = status.getTaskId.getValue.toInt
-    val state = status.getState
-    logInfo(s"Mesos task $taskId is now $state")
-    val slaveId: String = status.getSlaveId.getValue
+    val taskId = status.getTaskId.getValue
+    val slaveId = status.getSlaveId.getValue
+    val state = TaskState.fromMesos(status.getState)
+
+    logInfo(s"Mesos task $taskId is now ${status.getState}")
+
     stateLock.synchronized {
+      val slave = slaves(slaveId)
+
       // If the shuffle service is enabled, have the driver register with each one of the
       // shuffle services. This allows the shuffle services to clean up state associated with
       // this application when the driver exits. There is currently not a great way to detect
       // this through Mesos, since the shuffle services are set up independently.
-      if (TaskState.fromMesos(state).equals(TaskState.RUNNING) &&
-          slaveIdToHost.contains(slaveId) &&
-          shuffleServiceEnabled) {
+      if (state.equals(TaskState.RUNNING) &&
+          shuffleServiceEnabled &&
+          !slave.shuffleRegistered) {
         assume(mesosExternalShuffleClient.isDefined,
           "External shuffle client was not instantiated even though shuffle service is enabled.")
         // TODO: Remove this and allow the MesosExternalShuffleService to detect
         // framework termination when new Mesos Framework HTTP API is available.
         val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
-        val hostname = slaveIdToHost.remove(slaveId).get
+
         logDebug(s"Connecting to shuffle service on slave $slaveId, " +
-            s"host $hostname, port $externalShufflePort for app ${conf.getAppId}")
+            s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
+
         mesosExternalShuffleClient.get
-          .registerDriverWithShuffleService(hostname, externalShufflePort)
+          .registerDriverWithShuffleService(slave.hostname, externalShufflePort)
+        slave.shuffleRegistered = true
       }
 
-      if (TaskState.isFinished(TaskState.fromMesos(state))) {
-        val slaveId = taskIdToSlaveId.get(taskId)
-        slaveIdsWithExecutors -= slaveId
-        taskIdToSlaveId.remove(taskId)
+      if (TaskState.isFinished(state)) {
         // Remove the cores we have remembered for this task, if it's in the hashmap
         for (cores <- coresByTaskId.get(taskId)) {
           totalCoresAcquired -= cores
           coresByTaskId -= taskId
         }
         // If it was a failure, mark the slave as failed for blacklisting purposes
-        if (TaskState.isFailed(TaskState.fromMesos(state))) {
-          failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
-          if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
+        if (TaskState.isFailed(state)) {
+          slave.taskFailures += 1
+
+          if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
             logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
                 "is Spark installed on it?")
           }
         }
-        executorTerminated(d, slaveId, s"Executor finished with state $state")
+        executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
         // In case we'd rejected everything before but have now lost a node
         d.reviveOffers()
       }
@@ -396,20 +489,24 @@ private[spark] class CoarseMesosSchedulerBackend(
       stopCalled = true
       super.stop()
     }
+
     // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them.
     // See SPARK-12330
     val stopwatch = new Stopwatch()
     stopwatch.start()
+
     // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent
-    while (slaveIdsWithExecutors.nonEmpty &&
+    while (numExecutors() > 0 &&
       stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) {
       Thread.sleep(100)
     }
-    if (slaveIdsWithExecutors.nonEmpty) {
-      logWarning(s"Timed out waiting for ${slaveIdsWithExecutors.size} remaining executors "
+
+    if (numExecutors() > 0) {
+      logWarning(s"Timed out waiting for ${numExecutors()} remaining executors "
         + s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files "
         + "on the mesos nodes.")
     }
+
     if (mesosDriver != null) {
       mesosDriver.stop()
     }
@@ -418,40 +515,25 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
 
   /**
-   * Called when a slave is lost or a Mesos task finished. Update local view on
-   * what tasks are running and remove the terminated slave from the list of pending
-   * slave IDs that we might have asked to be killed. It also notifies the driver
-   * that an executor was removed.
+   * Called when a slave is lost or a Mesos task finished. Updates local view on
+   * what tasks are running. It also notifies the driver that an executor was removed.
    */
-  private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = {
+  private def executorTerminated(d: SchedulerDriver,
+                                 slaveId: String,
+                                 taskId: String,
+                                 reason: String): Unit = {
     stateLock.synchronized {
-      if (slaveIdsWithExecutors.contains(slaveId)) {
-        val slaveIdToTaskId = taskIdToSlaveId.inverse()
-        if (slaveIdToTaskId.containsKey(slaveId)) {
-          val taskId: Int = slaveIdToTaskId.get(slaveId)
-          taskIdToSlaveId.remove(taskId)
-          removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason))
-        }
-        // TODO: This assumes one Spark executor per Mesos slave,
-        // which may no longer be true after SPARK-5095
-        pendingRemovedSlaveIds -= slaveId
-        slaveIdsWithExecutors -= slaveId
-      }
+      removeExecutor(taskId, SlaveLost(reason))
+      slaves(slaveId).taskIDs.remove(taskId)
     }
   }
 
-  private def sparkExecutorId(slaveId: String, taskId: String): String = {
-    s"$slaveId/$taskId"
-  }
-
   override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
     logInfo(s"Mesos slave lost: ${slaveId.getValue}")
-    executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
   }
 
   override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
-    logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
-    slaveLost(d, s)
+    logInfo("Mesos executor lost: %s".format(e.getValue))
   }
 
   override def applicationId(): String =
@@ -471,23 +553,26 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def doKillExecutors(executorIds: Seq[String]): Boolean = {
     if (mesosDriver == null) {
       logWarning("Asked to kill executors before the Mesos driver was started.")
-      return false
-    }
-
-    val slaveIdToTaskId = taskIdToSlaveId.inverse()
-    for (executorId <- executorIds) {
-      val slaveId = executorId.split("/")(0)
-      if (slaveIdToTaskId.containsKey(slaveId)) {
-        mesosDriver.killTask(
-          TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
-        pendingRemovedSlaveIds += slaveId
-      } else {
-        logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler")
+      false
+    } else {
+      for (executorId <- executorIds) {
+        val taskId = TaskID.newBuilder().setValue(executorId).build()
+        mesosDriver.killTask(taskId)
       }
+      // no need to adjust `executorLimitOption` since the AllocationManager already communicated
+      // the desired limit through a call to `doRequestTotalExecutors`.
+      // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
+      true
     }
-    // no need to adjust `executorLimitOption` since the AllocationManager already communicated
-    // the desired limit through a call to `doRequestTotalExecutors`.
-    // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
-    true
   }
+
+  private def numExecutors(): Int = {
+    slaves.values.map(_.taskIDs.size).sum
+  }
+}
+
+private class Slave(val hostname: String) {
+  val taskIDs = new HashSet[String]()
+  var taskFailures = 0
+  var shuffleRegistered = false
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/80cb963a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 340f29b..8929d8a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -138,7 +138,7 @@ private[spark] class MesosSchedulerBackend(
     val (resourcesAfterCpu, usedCpuResources) =
       partitionResources(availableResources, "cpus", mesosExecutorCores)
     val (resourcesAfterMem, usedMemResources) =
-      partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
+      partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc))
 
     builder.addAllResources(usedCpuResources.asJava)
     builder.addAllResources(usedMemResources.asJava)
@@ -250,7 +250,7 @@ private[spark] class MesosSchedulerBackend(
         // check offers for
         //  1. Memory requirements
         //  2. CPU requirements - need at least 1 for executor, 1 for task
-        val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
+        val meetsMemoryRequirements = mem >= executorMemory(sc)
         val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
         val meetsRequirements =
           (meetsMemoryRequirements && meetsCPURequirements) ||

http://git-wip-us.apache.org/repos/asf/spark/blob/80cb963a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index f9f5da9..a98f2f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -140,15 +140,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
     }
   }
 
-  /**
-   * Signal that the scheduler has registered with Mesos.
-   */
-  protected def getResource(res: JList[Resource], name: String): Double = {
+  def getResource(res: JList[Resource], name: String): Double = {
     // A resource can have multiple values in the offer since it can either be from
     // a specific role or wildcard.
     res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
   }
 
+  /**
+    * Signal that the scheduler has registered with Mesos.
+    */
   protected def markRegistered(): Unit = {
     registerLatch.countDown()
   }
@@ -337,7 +337,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
    * @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
    *         (whichever is larger)
    */
-  def calculateTotalMemory(sc: SparkContext): Int = {
+  def executorMemory(sc: SparkContext): Int = {
     sc.conf.getInt("spark.mesos.executor.memoryOverhead",
       math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
       sc.executorMemory

http://git-wip-us.apache.org/repos/asf/spark/blob/80cb963a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index a4110d2..e542aa0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -17,19 +17,23 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import java.util
 import java.util.Collections
 
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
 import org.apache.mesos.Protos._
 import org.apache.mesos.Protos.Value.Scalar
-import org.mockito.Matchers
+import org.mockito.{ArgumentCaptor, Matchers}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
+import org.apache.spark.rpc.{RpcEndpointRef}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
 class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
@@ -37,6 +41,223 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
     with MockitoSugar
     with BeforeAndAfter {
 
+  var sparkConf: SparkConf = _
+  var driver: SchedulerDriver = _
+  var taskScheduler: TaskSchedulerImpl = _
+  var backend: CoarseMesosSchedulerBackend = _
+  var externalShuffleClient: MesosExternalShuffleClient = _
+  var driverEndpoint: RpcEndpointRef = _
+
+  test("mesos supports killing and limiting executors") {
+    setBackend()
+    sparkConf.set("spark.driver.host", "driverHost")
+    sparkConf.set("spark.driver.port", "1234")
+
+    val minMem = backend.executorMemory(sc)
+    val minCpu = 4
+    val offers = List((minMem, minCpu))
+
+    // launches a task on a valid offer
+    offerResources(offers)
+    verifyTaskLaunched("o1")
+
+    // kills executors
+    backend.doRequestTotalExecutors(0)
+    assert(backend.doKillExecutors(Seq("0")))
+    val taskID0 = createTaskId("0")
+    verify(driver, times(1)).killTask(taskID0)
+
+    // doesn't launch a new task when requested executors == 0
+    offerResources(offers, 2)
+    verifyDeclinedOffer(driver, createOfferId("o2"))
+
+    // Launches a new task when requested executors is positive
+    backend.doRequestTotalExecutors(2)
+    offerResources(offers, 2)
+    verifyTaskLaunched("o2")
+  }
+
+  test("mesos supports killing and relaunching tasks with executors") {
+    setBackend()
+
+    // launches a task on a valid offer
+    val minMem = backend.executorMemory(sc) + 1024
+    val minCpu = 4
+    val offer1 = (minMem, minCpu)
+    val offer2 = (minMem, 1)
+    offerResources(List(offer1, offer2))
+    verifyTaskLaunched("o1")
+
+    // accounts for a killed task
+    val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
+    backend.statusUpdate(driver, status)
+    verify(driver, times(1)).reviveOffers()
+
+    // Launches a new task on a valid offer from the same slave
+    offerResources(List(offer2))
+    verifyTaskLaunched("o2")
+  }
+
+  test("mesos supports spark.executor.cores") {
+    val executorCores = 4
+    setBackend(Map("spark.executor.cores" -> executorCores.toString))
+
+    val executorMemory = backend.executorMemory(sc)
+    val offers = List((executorMemory * 2, executorCores + 1))
+    offerResources(offers)
+
+    val taskInfos = verifyTaskLaunched("o1")
+    assert(taskInfos.size() == 1)
+
+    val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+    assert(cpus == executorCores)
+  }
+
+  test("mesos supports unset spark.executor.cores") {
+    setBackend()
+
+    val executorMemory = backend.executorMemory(sc)
+    val offerCores = 10
+    offerResources(List((executorMemory * 2, offerCores)))
+
+    val taskInfos = verifyTaskLaunched("o1")
+    assert(taskInfos.size() == 1)
+
+    val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+    assert(cpus == offerCores)
+  }
+
+  test("mesos does not acquire more than spark.cores.max") {
+    val maxCores = 10
+    setBackend(Map("spark.cores.max" -> maxCores.toString))
+
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List((executorMemory, maxCores + 1)))
+
+    val taskInfos = verifyTaskLaunched("o1")
+    assert(taskInfos.size() == 1)
+
+    val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+    assert(cpus == maxCores)
+  }
+
+  test("mesos declines offers that violate attribute constraints") {
+    setBackend(Map("spark.mesos.constraints" -> "x:true"))
+    offerResources(List((backend.executorMemory(sc), 4)))
+    verifyDeclinedOffer(driver, createOfferId("o1"), true)
+  }
+
+  test("mesos assigns tasks round-robin on offers") {
+    val executorCores = 4
+    val maxCores = executorCores * 2
+    setBackend(Map("spark.executor.cores" -> executorCores.toString,
+      "spark.cores.max" -> maxCores.toString))
+
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List(
+      (executorMemory * 2, executorCores * 2),
+      (executorMemory * 2, executorCores * 2)))
+
+    verifyTaskLaunched("o1")
+    verifyTaskLaunched("o2")
+  }
+
+  test("mesos creates multiple executors on a single slave") {
+    val executorCores = 4
+    setBackend(Map("spark.executor.cores" -> executorCores.toString))
+
+    // offer with room for two executors
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List((executorMemory * 2, executorCores * 2)))
+
+    // verify two executors were started on a single offer
+    val taskInfos = verifyTaskLaunched("o1")
+    assert(taskInfos.size() == 2)
+  }
+
+  test("mesos doesn't register twice with the same shuffle service") {
+    setBackend(Map("spark.shuffle.service.enabled" -> "true"))
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+    verifyTaskLaunched("o1")
+
+    val offer2 = createOffer("o2", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer2).asJava)
+    verifyTaskLaunched("o2")
+
+    val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING)
+    backend.statusUpdate(driver, status1)
+
+    val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING)
+    backend.statusUpdate(driver, status2)
+    verify(externalShuffleClient, times(1)).registerDriverWithShuffleService(anyString, anyInt)
+  }
+
+  test("mesos kills an executor when told") {
+    setBackend()
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+    verifyTaskLaunched("o1")
+
+    backend.doKillExecutors(List("0"))
+    verify(driver, times(1)).killTask(createTaskId("0"))
+  }
+
+  private def verifyDeclinedOffer(driver: SchedulerDriver,
+      offerId: OfferID,
+      filter: Boolean = false): Unit = {
+    if (filter) {
+      verify(driver, times(1)).declineOffer(Matchers.eq(offerId), anyObject[Filters])
+    } else {
+      verify(driver, times(1)).declineOffer(Matchers.eq(offerId))
+    }
+  }
+
+  private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = {
+    val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
+      createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
+
+    backend.resourceOffers(driver, mesosOffers.asJava)
+  }
+
+  private def verifyTaskLaunched(offerId: String): java.util.Collection[TaskInfo] = {
+    val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(createOfferId(offerId))),
+      captor.capture())
+    captor.getValue
+  }
+
+  private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = {
+    TaskStatus.newBuilder()
+      .setTaskId(TaskID.newBuilder().setValue(taskId).build())
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+      .setState(state)
+      .build
+  }
+
+
+  private def createOfferId(offerId: String): OfferID = {
+    OfferID.newBuilder().setValue(offerId).build()
+  }
+
+  private def createSlaveId(slaveId: String): SlaveID = {
+    SlaveID.newBuilder().setValue(slaveId).build()
+  }
+
+  private def createExecutorId(executorId: String): ExecutorID = {
+    ExecutorID.newBuilder().setValue(executorId).build()
+  }
+
+  private def createTaskId(taskId: String): TaskID = {
+    TaskID.newBuilder().setValue(taskId).build()
+  }
+
   private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
     val builder = Offer.newBuilder()
     builder.addResourcesBuilder()
@@ -47,8 +268,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
       .setName("cpus")
       .setType(Value.Type.SCALAR)
       .setScalar(Scalar.newBuilder().setValue(cpu))
-    builder.setId(OfferID.newBuilder()
-      .setValue(offerId).build())
+    builder.setId(createOfferId(offerId))
       .setFrameworkId(FrameworkID.newBuilder()
         .setValue("f1"))
       .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
@@ -58,130 +278,55 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
 
   private def createSchedulerBackend(
       taskScheduler: TaskSchedulerImpl,
-      driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
+      driver: SchedulerDriver,
+      shuffleClient: MesosExternalShuffleClient,
+      endpoint: RpcEndpointRef): CoarseMesosSchedulerBackend = {
     val securityManager = mock[SecurityManager]
+
     val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) {
       override protected def createSchedulerDriver(
-        masterUrl: String,
-        scheduler: Scheduler,
-        sparkUser: String,
-        appName: String,
-        conf: SparkConf,
-        webuiUrl: Option[String] = None,
-        checkpoint: Option[Boolean] = None,
-        failoverTimeout: Option[Double] = None,
-        frameworkId: Option[String] = None): SchedulerDriver = driver
+          masterUrl: String,
+          scheduler: Scheduler,
+          sparkUser: String,
+          appName: String,
+          conf: SparkConf,
+          webuiUrl: Option[String] = None,
+          checkpoint: Option[Boolean] = None,
+          failoverTimeout: Option[Double] = None,
+          frameworkId: Option[String] = None): SchedulerDriver = driver
+
+      override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
+
+      override protected def createDriverEndpointRef(
+          properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
+
       markRegistered()
     }
     backend.start()
     backend
   }
 
-  var sparkConf: SparkConf = _
-
-  before {
+  private def setBackend(sparkConfVars: Map[String, String] = null) {
     sparkConf = (new SparkConf)
       .setMaster("local[*]")
       .setAppName("test-mesos-dynamic-alloc")
       .setSparkHome("/path")
 
-    sc = new SparkContext(sparkConf)
-  }
-
-  test("mesos supports killing and limiting executors") {
-    val driver = mock[SchedulerDriver]
-    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
-    val taskScheduler = mock[TaskSchedulerImpl]
-    when(taskScheduler.sc).thenReturn(sc)
-
-    sparkConf.set("spark.driver.host", "driverHost")
-    sparkConf.set("spark.driver.port", "1234")
-
-    val backend = createSchedulerBackend(taskScheduler, driver)
-    val minMem = backend.calculateTotalMemory(sc)
-    val minCpu = 4
-
-    val mesosOffers = new java.util.ArrayList[Offer]
-    mesosOffers.add(createOffer("o1", "s1", minMem, minCpu))
-
-    val taskID0 = TaskID.newBuilder().setValue("0").build()
-
-    backend.resourceOffers(driver, mesosOffers)
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
-      any[util.Collection[TaskInfo]],
-      any[Filters])
-
-    // simulate the allocation manager down-scaling executors
-    backend.doRequestTotalExecutors(0)
-    assert(backend.doKillExecutors(Seq("s1/0")))
-    verify(driver, times(1)).killTask(taskID0)
-
-    val mesosOffers2 = new java.util.ArrayList[Offer]
-    mesosOffers2.add(createOffer("o2", "s2", minMem, minCpu))
-    backend.resourceOffers(driver, mesosOffers2)
-
-    verify(driver, times(1))
-      .declineOffer(OfferID.newBuilder().setValue("o2").build())
-
-    // Verify we didn't launch any new executor
-    assert(backend.slaveIdsWithExecutors.size === 1)
-
-    backend.doRequestTotalExecutors(2)
-    backend.resourceOffers(driver, mesosOffers2)
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(mesosOffers2.get(0).getId)),
-      any[util.Collection[TaskInfo]],
-      any[Filters])
+    if (sparkConfVars != null) {
+      for (attr <- sparkConfVars) {
+        sparkConf.set(attr._1, attr._2)
+      }
+    }
 
-    assert(backend.slaveIdsWithExecutors.size === 2)
-    backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build())
-    assert(backend.slaveIdsWithExecutors.size === 1)
-  }
+    sc = new SparkContext(sparkConf)
 
-  test("mesos supports killing and relaunching tasks with executors") {
-    val driver = mock[SchedulerDriver]
+    driver = mock[SchedulerDriver]
     when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
-    val taskScheduler = mock[TaskSchedulerImpl]
+    taskScheduler = mock[TaskSchedulerImpl]
     when(taskScheduler.sc).thenReturn(sc)
+    externalShuffleClient = mock[MesosExternalShuffleClient]
+    driverEndpoint = mock[RpcEndpointRef]
 
-    val backend = createSchedulerBackend(taskScheduler, driver)
-    val minMem = backend.calculateTotalMemory(sc) + 1024
-    val minCpu = 4
-
-    val mesosOffers = new java.util.ArrayList[Offer]
-    val offer1 = createOffer("o1", "s1", minMem, minCpu)
-    mesosOffers.add(offer1)
-
-    val offer2 = createOffer("o2", "s1", minMem, 1);
-
-    backend.resourceOffers(driver, mesosOffers)
-
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(offer1.getId)),
-      anyObject(),
-      anyObject[Filters])
-
-    // Simulate task killed, executor no longer running
-    val status = TaskStatus.newBuilder()
-      .setTaskId(TaskID.newBuilder().setValue("0").build())
-      .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
-      .setState(TaskState.TASK_KILLED)
-      .build
-
-    backend.statusUpdate(driver, status)
-    assert(!backend.slaveIdsWithExecutors.contains("s1"))
-
-    mesosOffers.clear()
-    mesosOffers.add(offer2)
-    backend.resourceOffers(driver, mesosOffers)
-    assert(backend.slaveIdsWithExecutors.contains("s1"))
-
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(offer2.getId)),
-      anyObject(),
-      anyObject[Filters])
-
-    verify(driver, times(1)).reviveOffers()
+    backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/80cb963a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index e111e2e..3fb3279 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -189,7 +189,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
 
     val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
 
-    val minMem = backend.calculateTotalMemory(sc)
+    val minMem = backend.executorMemory(sc)
     val minCpu = 4
 
     val mesosOffers = new java.util.ArrayList[Offer]

http://git-wip-us.apache.org/repos/asf/spark/blob/80cb963a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index 2eb43b7..85437b2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -41,20 +41,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
   test("use at-least minimum overhead") {
     val f = fixture
     when(f.sc.executorMemory).thenReturn(512)
-    utils.calculateTotalMemory(f.sc) shouldBe 896
+    utils.executorMemory(f.sc) shouldBe 896
   }
 
   test("use overhead if it is greater than minimum value") {
     val f = fixture
     when(f.sc.executorMemory).thenReturn(4096)
-    utils.calculateTotalMemory(f.sc) shouldBe 4505
+    utils.executorMemory(f.sc) shouldBe 4505
   }
 
   test("use spark.mesos.executor.memoryOverhead (if set)") {
     val f = fixture
     when(f.sc.executorMemory).thenReturn(1024)
     f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
-    utils.calculateTotalMemory(f.sc) shouldBe 1536
+    utils.executorMemory(f.sc) shouldBe 1536
   }
 
   test("parse a non-empty constraint string correctly") {

http://git-wip-us.apache.org/repos/asf/spark/blob/80cb963a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index cd9dc1b..b07c69c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -825,13 +825,18 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.executor.cores</code></td>
-  <td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
   <td>
-    The number of cores to use on each executor. For YARN and standalone mode only.
+    1 in YARN mode, all the available cores on the worker in
+    standalone and Mesos coarse-grained modes.
+  </td>
+  <td>
+    The number of cores to use on each executor.
 
-    In standalone mode, setting this parameter allows an application to run multiple executors on
-    the same worker, provided that there are enough cores on that worker. Otherwise, only one
-    executor per application will run on each worker.
+    In standalone and Mesos coarse-grained modes, setting this
+    parameter allows an application to run multiple executors on the
+    same worker, provided that there are enough cores on that
+    worker. Otherwise, only one executor per application will run on
+    each worker.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/80cb963a/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index e1c87a8..0df476d 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -277,9 +277,11 @@ See the [configuration page](configuration.html) for information on Spark config
   <td><code>spark.mesos.extra.cores</code></td>
   <td><code>0</code></td>
   <td>
-    Set the extra amount of cpus to request per task. This setting is only used for Mesos coarse grain mode.
-    The total amount of cores requested per task is the number of cores in the offer plus the extra cores configured.
-    Note that total amount of cores the executor will request in total will not exceed the <code>spark.cores.max</code> setting.
+    Set the extra number of cores for an executor to advertise. This
+    does not result in more cores allocated.  It instead means that an
+    executor will "pretend" it has more cores, so that the driver will
+    send it more tasks.  Use this to increase parallelism.  This
+    setting is only used for Mesos coarse-grained mode.
   </td>
 </tr>
 <tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org