You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/07/09 22:26:49 UTC

spark git commit: [SPARK-6287] [MESOS] Add dynamic allocation to the coarse-grained Mesos scheduler

Repository: spark
Updated Branches:
  refs/heads/master ebdf58538 -> c4830598b


[SPARK-6287] [MESOS] Add dynamic allocation to the coarse-grained Mesos scheduler

This is largely based on extracting the dynamic allocation parts from tnachen's #3861.

Author: Iulian Dragos <ja...@gmail.com>

Closes #4984 from dragos/issue/mesos-coarse-dynamicAllocation and squashes the following commits:

39df8cd [Iulian Dragos] Update tests to latest changes in core.
9d2c9fa [Iulian Dragos] Remove adjustment of executorLimitOption in doKillExecutors.
8b00f52 [Iulian Dragos] Latest round of reviews.
0cd00e0 [Iulian Dragos] Add persistent shuffle directory
15c45c1 [Iulian Dragos] Add dynamic allocation to the Spark coarse-grained scheduler.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4830598
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4830598
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4830598

Branch: refs/heads/master
Commit: c4830598b271cc6390d127bd4cf8ab02b28792e0
Parents: ebdf585
Author: Iulian Dragos <ja...@gmail.com>
Authored: Thu Jul 9 13:26:46 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Jul 9 13:26:46 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  19 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     | 136 +++++++++++---
 .../cluster/mesos/MesosSchedulerUtils.scala     |   4 +-
 .../apache/spark/storage/DiskBlockManager.scala |   8 +-
 .../scala/org/apache/spark/util/Utils.scala     |  45 +++--
 .../CoarseMesosSchedulerBackendSuite.scala      | 175 +++++++++++++++++++
 6 files changed, 331 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c4830598/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d2547ee..82704b1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -532,7 +532,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     _executorAllocationManager =
       if (dynamicAllocationEnabled) {
         assert(supportDynamicAllocation,
-          "Dynamic allocation of executors is currently only supported in YARN mode")
+          "Dynamic allocation of executors is currently only supported in YARN and Mesos mode")
         Some(new ExecutorAllocationManager(this, listenerBus, _conf))
       } else {
         None
@@ -853,7 +853,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       minPartitions).setName(path)
   }
 
-
   /**
    * :: Experimental ::
    *
@@ -1364,10 +1363,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   /**
    * Return whether dynamically adjusting the amount of resources allocated to
-   * this application is supported. This is currently only available for YARN.
+   * this application is supported. This is currently only available for YARN
+   * and Mesos coarse-grained mode.
    */
-  private[spark] def supportDynamicAllocation =
-    master.contains("yarn") || _conf.getBoolean("spark.dynamicAllocation.testing", false)
+  private[spark] def supportDynamicAllocation: Boolean = {
+    (master.contains("yarn")
+      || master.contains("mesos")
+      || _conf.getBoolean("spark.dynamicAllocation.testing", false))
+  }
 
   /**
    * :: DeveloperApi ::
@@ -1385,7 +1388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    */
   private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
     assert(supportDynamicAllocation,
-      "Requesting executors is currently only supported in YARN mode")
+      "Requesting executors is currently only supported in YARN and Mesos modes")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.requestTotalExecutors(numExecutors)
@@ -1403,7 +1406,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   @DeveloperApi
   override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
     assert(supportDynamicAllocation,
-      "Requesting executors is currently only supported in YARN mode")
+      "Requesting executors is currently only supported in YARN and Mesos modes")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.requestExecutors(numAdditionalExecutors)
@@ -1421,7 +1424,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   @DeveloperApi
   override def killExecutors(executorIds: Seq[String]): Boolean = {
     assert(supportDynamicAllocation,
-      "Killing executors is currently only supported in YARN mode")
+      "Killing executors is currently only supported in YARN and Mesos modes")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.killExecutors(executorIds)

http://git-wip-us.apache.org/repos/asf/spark/blob/c4830598/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index b68f8c7..cbade13 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,11 +18,14 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
-import java.util.{List => JList}
+import java.util.{List => JList, Collections}
+import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
 
+import com.google.common.collect.HashBiMap
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.{Scheduler => MScheduler, _}
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
@@ -60,9 +63,27 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   val slaveIdsWithExecutors = new HashSet[String]
 
-  val taskIdToSlaveId = new HashMap[Int, String]
-  val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
+  val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
+  // How many times tasks on each slave failed
+  val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
+
+  /**
+   * The total number of executors we aim to have. Undefined when not using dynamic allocation
+   * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]].
+   */
+  private var executorLimitOption: Option[Int] = None
+
+  /**
+   *  Return the current executor limit, which may be [[Int.MaxValue]]
+   *  before properly initialized.
+   */
+  private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)
+
+  private val pendingRemovedSlaveIds = new HashSet[String]
 
+  // private lock object protecting mutable state above. Using the intrinsic lock
+  // may lead to deadlocks since the superclass might also try to lock
+  private val stateLock = new ReentrantLock
 
   val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
 
@@ -86,7 +107,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
   }
 
-  def createCommand(offer: Offer, numCores: Int): CommandInfo = {
+  def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
     val executorSparkHome = conf.getOption("spark.mesos.executor.home")
       .orElse(sc.getSparkHome())
       .getOrElse {
@@ -120,10 +141,6 @@ private[spark] class CoarseMesosSchedulerBackend(
     }
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
-    val driverUrl = sc.env.rpcEnv.uriOf(
-      SparkEnv.driverActorSystemName,
-      RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
-      CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
 
     val uri = conf.getOption("spark.executor.uri")
       .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
@@ -133,7 +150,7 @@ private[spark] class CoarseMesosSchedulerBackend(
       command.setValue(
         "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
           .format(prefixEnv, runScript) +
-        s" --driver-url $driverUrl" +
+        s" --driver-url $driverURL" +
         s" --executor-id ${offer.getSlaveId.getValue}" +
         s" --hostname ${offer.getHostname}" +
         s" --cores $numCores" +
@@ -142,11 +159,12 @@ private[spark] class CoarseMesosSchedulerBackend(
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.get.split('/').last.split('.').head
+      val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString)
       command.setValue(
         s"cd $basename*; $prefixEnv " +
          "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
-        s" --driver-url $driverUrl" +
-        s" --executor-id ${offer.getSlaveId.getValue}" +
+        s" --driver-url $driverURL" +
+        s" --executor-id $executorId" +
         s" --hostname ${offer.getHostname}" +
         s" --cores $numCores" +
         s" --app-id $appId")
@@ -155,6 +173,17 @@ private[spark] class CoarseMesosSchedulerBackend(
     command.build()
   }
 
+  protected def driverURL: String = {
+    if (conf.contains("spark.testing")) {
+      "driverURL"
+    } else {
+      sc.env.rpcEnv.uriOf(
+        SparkEnv.driverActorSystemName,
+        RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
+        CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
+    }
+  }
+
   override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
 
   override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
@@ -172,17 +201,18 @@ private[spark] class CoarseMesosSchedulerBackend(
    * unless we've already launched more than we wanted to.
    */
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
-    synchronized {
+    stateLock.synchronized {
       val filters = Filters.newBuilder().setRefuseSeconds(5).build()
       for (offer <- offers) {
         val offerAttributes = toAttributeMap(offer.getAttributesList)
         val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
-        val slaveId = offer.getSlaveId.toString
+        val slaveId = offer.getSlaveId.getValue
         val mem = getResource(offer.getResourcesList, "mem")
         val cpus = getResource(offer.getResourcesList, "cpus").toInt
         val id = offer.getId.getValue
-        if (meetsConstraints &&
+        if (taskIdToSlaveId.size < executorLimit &&
             totalCoresAcquired < maxCores &&
+            meetsConstraints &&
             mem >= calculateTotalMemory(sc) &&
             cpus >= 1 &&
             failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
@@ -197,7 +227,7 @@ private[spark] class CoarseMesosSchedulerBackend(
           val task = MesosTaskInfo.newBuilder()
             .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
             .setSlaveId(offer.getSlaveId)
-            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
+            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
             .setName("Task " + taskId)
             .addResources(createResource("cpus", cpusToUse))
             .addResources(createResource("mem", calculateTotalMemory(sc)))
@@ -209,7 +239,9 @@ private[spark] class CoarseMesosSchedulerBackend(
 
           // accept the offer and launch the task
           logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
-          d.launchTasks(List(offer.getId), List(task.build()), filters)
+          d.launchTasks(
+            Collections.singleton(offer.getId),
+            Collections.singleton(task.build()), filters)
         } else {
           // Decline the offer
           logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
@@ -224,7 +256,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     val taskId = status.getTaskId.getValue.toInt
     val state = status.getState
     logInfo("Mesos task " + taskId + " is now " + state)
-    synchronized {
+    stateLock.synchronized {
       if (TaskState.isFinished(TaskState.fromMesos(state))) {
         val slaveId = taskIdToSlaveId(taskId)
         slaveIdsWithExecutors -= slaveId
@@ -242,8 +274,9 @@ private[spark] class CoarseMesosSchedulerBackend(
                 "is Spark installed on it?")
           }
         }
+        executorTerminated(d, slaveId, s"Executor finished with state $state")
         // In case we'd rejected everything before but have now lost a node
-        mesosDriver.reviveOffers()
+        d.reviveOffers()
       }
     }
   }
@@ -262,18 +295,39 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
 
-  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
-    logInfo("Mesos slave lost: " + slaveId.getValue)
-    synchronized {
-      if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
-        // Note that the slave ID corresponds to the executor ID on that slave
-        slaveIdsWithExecutors -= slaveId.getValue
-        removeExecutor(slaveId.getValue, "Mesos slave lost")
+  /**
+   * Called when a slave is lost or a Mesos task finished. Update local view on
+   * what tasks are running and remove the terminated slave from the list of pending
+   * slave IDs that we might have asked to be killed. It also notifies the driver
+   * that an executor was removed.
+   */
+  private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = {
+    stateLock.synchronized {
+      if (slaveIdsWithExecutors.contains(slaveId)) {
+        val slaveIdToTaskId = taskIdToSlaveId.inverse()
+        if (slaveIdToTaskId.contains(slaveId)) {
+          val taskId: Int = slaveIdToTaskId.get(slaveId)
+          taskIdToSlaveId.remove(taskId)
+          removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
+        }
+        // TODO: This assumes one Spark executor per Mesos slave,
+        // which may no longer be true after SPARK-5095
+        pendingRemovedSlaveIds -= slaveId
+        slaveIdsWithExecutors -= slaveId
       }
     }
   }
 
-  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+  private def sparkExecutorId(slaveId: String, taskId: String): String = {
+    s"$slaveId/$taskId"
+  }
+
+  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
+    logInfo("Mesos slave lost: " + slaveId.getValue)
+    executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
+  }
+
+  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
     logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
     slaveLost(d, s)
   }
@@ -284,4 +338,34 @@ private[spark] class CoarseMesosSchedulerBackend(
       super.applicationId
     }
 
+  override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+    // We don't truly know if we can fulfill the full amount of executors
+    // since at coarse grain it depends on the amount of slaves available.
+    logInfo("Capping the total amount of executors to " + requestedTotal)
+    executorLimitOption = Some(requestedTotal)
+    true
+  }
+
+  override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+    if (mesosDriver == null) {
+      logWarning("Asked to kill executors before the Mesos driver was started.")
+      return false
+    }
+
+    val slaveIdToTaskId = taskIdToSlaveId.inverse()
+    for (executorId <- executorIds) {
+      val slaveId = executorId.split("/")(0)
+      if (slaveIdToTaskId.contains(slaveId)) {
+        mesosDriver.killTask(
+          TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
+        pendingRemovedSlaveIds += slaveId
+      } else {
+        logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler")
+      }
+    }
+    // no need to adjust `executorLimitOption` since the AllocationManager already communicated
+    // the desired limit through a call to `doRequestTotalExecutors`.
+    // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
+    true
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c4830598/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index d8a8c84..925702e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
 import scala.util.control.NonFatal
 
 import com.google.common.base.Splitter
-import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler}
+import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos}
 import org.apache.mesos.Protos._
 import org.apache.mesos.protobuf.GeneratedMessage
 import org.apache.spark.{Logging, SparkContext}
@@ -39,7 +39,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
   private final val registerLatch = new CountDownLatch(1)
 
   // Driver for talking to Mesos
-  protected var mesosDriver: MesosSchedulerDriver = null
+  protected var mesosDriver: SchedulerDriver = null
 
   /**
    * Starts the MesosSchedulerDriver with the provided information. This method returns

http://git-wip-us.apache.org/repos/asf/spark/blob/c4830598/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 91ef863..5f53769 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -124,10 +124,16 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     (blockId, getFile(blockId))
   }
 
+  /**
+   * Create local directories for storing block data. These directories are
+   * located inside configured local directories and won't
+   * be deleted on JVM exit when using the external shuffle service.
+   */
   private def createLocalDirs(conf: SparkConf): Array[File] = {
-    Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
+    Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
       try {
         val localDir = Utils.createDirectory(rootDir, "blockmgr")
+        Utils.chmod700(localDir)
         logInfo(s"Created local directory at $localDir")
         Some(localDir)
       } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/c4830598/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 944560a..b6b9321 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -733,7 +733,12 @@ private[spark] object Utils extends Logging {
     localRootDirs
   }
 
-  private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
+  /**
+   * Return the configured local directories where Spark can write files. This
+   * method does not create any directories on its own, it only encapsulates the
+   * logic of locating the local directories according to deployment mode.
+   */
+  def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
     if (isRunningInYarnContainer(conf)) {
       // If we are in yarn mode, systems can have different disk layouts so we must set it
       // to what Yarn on this system said was available. Note this assumes that Yarn has
@@ -749,27 +754,29 @@ private[spark] object Utils extends Logging {
       Option(conf.getenv("SPARK_LOCAL_DIRS"))
         .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
         .split(",")
-        .flatMap { root =>
-          try {
-            val rootDir = new File(root)
-            if (rootDir.exists || rootDir.mkdirs()) {
-              val dir = createTempDir(root)
-              chmod700(dir)
-              Some(dir.getAbsolutePath)
-            } else {
-              logError(s"Failed to create dir in $root. Ignoring this directory.")
-              None
-            }
-          } catch {
-            case e: IOException =>
-            logError(s"Failed to create local root dir in $root. Ignoring this directory.")
-            None
-          }
-        }
-        .toArray
     }
   }
 
+  private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
+    getConfiguredLocalDirs(conf).flatMap { root =>
+      try {
+        val rootDir = new File(root)
+        if (rootDir.exists || rootDir.mkdirs()) {
+          val dir = createTempDir(root)
+          chmod700(dir)
+          Some(dir.getAbsolutePath)
+        } else {
+          logError(s"Failed to create dir in $root. Ignoring this directory.")
+          None
+        }
+      } catch {
+        case e: IOException =>
+          logError(s"Failed to create local root dir in $root. Ignoring this directory.")
+          None
+      }
+    }.toArray
+  }
+
   /** Get the Yarn approved local directories. */
   private def getYarnLocalDirs(conf: SparkConf): String = {
     // Hadoop 0.23 and 2.x have different Environment variable names for the

http://git-wip-us.apache.org/repos/asf/spark/blob/c4830598/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
new file mode 100644
index 0000000..3f16929
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util
+import java.util.Collections
+
+import org.apache.mesos.Protos.Value.Scalar
+import org.apache.mesos.Protos._
+import org.apache.mesos.SchedulerDriver
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.Matchers
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+
+class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
+    with LocalSparkContext
+    with MockitoSugar
+    with BeforeAndAfter {
+
+  private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
+    val builder = Offer.newBuilder()
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(mem))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(cpu))
+    builder.setId(OfferID.newBuilder()
+      .setValue(offerId).build())
+      .setFrameworkId(FrameworkID.newBuilder()
+        .setValue("f1"))
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+      .setHostname(s"host${slaveId}")
+      .build()
+  }
+
+  private def createSchedulerBackend(
+      taskScheduler: TaskSchedulerImpl,
+      driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
+    val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") {
+      mesosDriver = driver
+      markRegistered()
+    }
+    backend.start()
+    backend
+  }
+
+  var sparkConf: SparkConf = _
+
+  before {
+    sparkConf = (new SparkConf)
+      .setMaster("local[*]")
+      .setAppName("test-mesos-dynamic-alloc")
+      .setSparkHome("/path")
+
+    sc = new SparkContext(sparkConf)
+  }
+
+  test("mesos supports killing and limiting executors") {
+    val driver = mock[SchedulerDriver]
+    val taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.sc).thenReturn(sc)
+
+    sparkConf.set("spark.driver.host", "driverHost")
+    sparkConf.set("spark.driver.port", "1234")
+
+    val backend = createSchedulerBackend(taskScheduler, driver)
+    val minMem = backend.calculateTotalMemory(sc).toInt
+    val minCpu = 4
+
+    val mesosOffers = new java.util.ArrayList[Offer]
+    mesosOffers.add(createOffer("o1", "s1", minMem, minCpu))
+
+    val taskID0 = TaskID.newBuilder().setValue("0").build()
+
+    backend.resourceOffers(driver, mesosOffers)
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+      any[util.Collection[TaskInfo]],
+      any[Filters])
+
+    // simulate the allocation manager down-scaling executors
+    backend.doRequestTotalExecutors(0)
+    assert(backend.doKillExecutors(Seq("s1/0")))
+    verify(driver, times(1)).killTask(taskID0)
+
+    val mesosOffers2 = new java.util.ArrayList[Offer]
+    mesosOffers2.add(createOffer("o2", "s2", minMem, minCpu))
+    backend.resourceOffers(driver, mesosOffers2)
+
+    verify(driver, times(1))
+      .declineOffer(OfferID.newBuilder().setValue("o2").build())
+
+    // Verify we didn't launch any new executor
+    assert(backend.slaveIdsWithExecutors.size === 1)
+
+    backend.doRequestTotalExecutors(2)
+    backend.resourceOffers(driver, mesosOffers2)
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(mesosOffers2.get(0).getId)),
+      any[util.Collection[TaskInfo]],
+      any[Filters])
+
+    assert(backend.slaveIdsWithExecutors.size === 2)
+    backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build())
+    assert(backend.slaveIdsWithExecutors.size === 1)
+  }
+
+  test("mesos supports killing and relaunching tasks with executors") {
+    val driver = mock[SchedulerDriver]
+    val taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.sc).thenReturn(sc)
+
+    val backend = createSchedulerBackend(taskScheduler, driver)
+    val minMem = backend.calculateTotalMemory(sc).toInt + 1024
+    val minCpu = 4
+
+    val mesosOffers = new java.util.ArrayList[Offer]
+    val offer1 = createOffer("o1", "s1", minMem, minCpu)
+    mesosOffers.add(offer1)
+
+    val offer2 = createOffer("o2", "s1", minMem, 1);
+
+    backend.resourceOffers(driver, mesosOffers)
+
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(offer1.getId)),
+      anyObject(),
+      anyObject[Filters])
+
+    // Simulate task killed, executor no longer running
+    val status = TaskStatus.newBuilder()
+      .setTaskId(TaskID.newBuilder().setValue("0").build())
+      .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
+      .setState(TaskState.TASK_KILLED)
+      .build
+
+    backend.statusUpdate(driver, status)
+    assert(!backend.slaveIdsWithExecutors.contains("s1"))
+
+    mesosOffers.clear()
+    mesosOffers.add(offer2)
+    backend.resourceOffers(driver, mesosOffers)
+    assert(backend.slaveIdsWithExecutors.contains("s1"))
+
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(offer2.getId)),
+      anyObject(),
+      anyObject[Filters])
+
+    verify(driver, times(1)).reviveOffers()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org