You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/30 01:49:28 UTC

git commit: [SPARK-3795] Heuristics for dynamically scaling executors

Repository: spark
Updated Branches:
  refs/heads/master e7fd80413 -> 8d59b37b0


[SPARK-3795] Heuristics for dynamically scaling executors

This is part of a bigger effort to provide elastic scaling of executors within a Spark application ([SPARK-3174](https://issues.apache.org/jira/browse/SPARK-3174)). This PR does not provide any functionality by itself; it is a skeleton that is missing a mechanism to be added later in [SPARK-3822](https://issues.apache.org/jira/browse/SPARK-3822).

Comments and feedback are most welcome. For those of you reviewing this in detail, I highly recommend doing it through your favorite IDE instead of through the diff here.

Author: Andrew Or <an...@gmail.com>
Author: Andrew Or <an...@databricks.com>

Closes #2746 from andrewor14/scaling-heuristics and squashes the following commits:

8a4fdaa [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics
e045df8 [Andrew Or] Add warning message (minor)
dfa31ec [Andrew Or] Fix tests
c0becc4 [Andrew Or] Merging with SPARK-3822
4784f93 [Andrew Or] Reword an awkward log message
181f27f [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics
c79e907 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics
4672b90 [Andrew Or] It's nano time.
a6a30f2 [Andrew Or] Do not allow min/max executors of 0
c60ec33 [Andrew Or] Rewrite test logic with clocks
b00b680 [Andrew Or] Fix style
c3caa65 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics
7f9da14 [Andrew Or] Factor out logic to verify bounds on # executors (minor)
f279019 [Andrew Or] Add time mocking tests for polling loop
685e347 [Andrew Or] Factor out clock in polling loop to facilitate testing
3cea7f7 [Andrew Or] Use PrivateMethodTester to keep original class private
3156d81 [Andrew Or] Update comments and exception messages
92f36f9 [Andrew Or] Address minor review comments
abdea61 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics
2aefd09 [Andrew Or] Correct listener behavior
9fe6e44 [Andrew Or] Rename variables and configs + update comments and log messages
149cc32 [Andrew Or] Fix style
254c958 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics
5ff829b [Andrew Or] Add tests for ExecutorAllocationManager
19c6c4b [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics
5896515 [Andrew Or] Move ExecutorAllocationManager out of scheduler package
9ca8945 [Andrew Or] Rewrite callbacks through the listener interface
5e336b9 [Andrew Or] Remove code from backend to avoid conflict with SPARK-3822
092d1fd [Andrew Or] Remove timeout logic for pending requests
1309fab [Andrew Or] Request executors by specifying the number pending
8bc0e9d [Andrew Or] Add logic to expire pending requests after timeouts
b750ee1 [Andrew Or] Express timers in terms of expiration times + remove retry logic
7f8dd47 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics
9d516cc [Andrew Or] Bug fix: Actually trigger the add timer / add retry timer
44f1832 [Andrew Or] Rename configs to include time units
eaae7ef [Andrew Or] Address various review comments
6f8be6c [Andrew Or] Beef up comments on what each of the timers mean
baaa403 [Andrew Or] Simplify variable names (minor)
42beec8 [Andrew Or] Reset whether the add threshold is crossed on cancellation
9bcc0bc [Andrew Or] ExecutorScalingManager -> ExecutorAllocationManager
2784398 [Andrew Or] Merge branch 'master' of github.com:apache/spark into scaling-heuristics
5a97d9e [Andrew Or] Log retry attempts in INFO + clean up logging
2f55c9f [Andrew Or] Do not keep requesting executors even after max attempts
0acd1cb [Andrew Or] Rewrite timer logic with polling
b3c7d44 [Andrew Or] Start the retry timer for adding executors at the right time
9b5f2ea [Andrew Or] Wording changes in comments and log messages
c2203a5 [Andrew Or] Simplify code to access the scheduler backend
e519d08 [Andrew Or] Simplify initialization code
2cc87a7 [Andrew Or] Add retry logic for removing executors
d0b34a6 [Andrew Or] Add retry logic for adding executors
9cc4649 [Andrew Or] Simplifying synchronization logic
67c03c7 [Andrew Or] Correct semantics of adding executors + update comments
6c48ab0 [Andrew Or] Update synchronization comment
8901900 [Andrew Or] Simplify remove policy + change the semantics of add policy
1cc8444 [Andrew Or] Minor wording change
ae5b64a [Andrew Or] Add synchronization
20ec6b9 [Andrew Or] First cut implementation of removing executors dynamically
4077ae2 [Andrew Or] Minor code re-organization
6f1fa66 [Andrew Or] First cut implementation of adding executors dynamically
b2e6dcc [Andrew Or] Add skeleton interface for requesting / killing executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d59b37b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d59b37b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d59b37b

Branch: refs/heads/master
Commit: 8d59b37b02eb36f37bcefafb952519d7dca744ad
Parents: e7fd804
Author: Andrew Or <an...@gmail.com>
Authored: Wed Oct 29 17:48:59 2014 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Oct 29 17:48:59 2014 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 462 +++++++++++++
 .../scala/org/apache/spark/SparkContext.scala   |  35 +-
 .../spark/ExecutorAllocationManagerSuite.scala  | 662 +++++++++++++++++++
 .../spark/deploy/yarn/ApplicationMaster.scala   |   2 +-
 4 files changed, 1150 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d59b37b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
new file mode 100644
index 0000000..b2cf022
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.mutable
+
+import org.apache.spark.scheduler._
+
+/**
+ * An agent that dynamically allocates and removes executors based on the workload.
+ *
+ * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
+ * the scheduler queue is not drained in N seconds, then new executors are added. If the queue
+ * persists for another M seconds, then more executors are added and so on. The number added
+ * in each round increases exponentially from the previous round until an upper bound on the
+ * number of executors has been reached.
+ *
+ * The rationale for the exponential increase is twofold: (1) Executors should be added slowly
+ * in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
+ * we may add more executors than we need just to remove them later. (2) Executors should be added
+ * quickly over time in case the maximum number of executors is very high. Otherwise, it will take
+ * a long time to ramp up under heavy workloads.
+ *
+ * The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not
+ * been scheduled to run any tasks, then it is removed.
+ *
+ * There is no retry logic in either case because we make the assumption that the cluster manager
+ * will eventually fulfill all requests it receives asynchronously.
+ *
+ * The relevant Spark properties include the following:
+ *
+ *   spark.dynamicAllocation.enabled - Whether this feature is enabled
+ *   spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
+ *   spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
+ *
+ *   spark.dynamicAllocation.schedulerBacklogTimeout (M) -
+ *     If there are backlogged tasks for this duration, add new executors
+ *
+ *   spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) -
+ *     If the backlog is sustained for this duration, add more executors
+ *     This is used only after the initial backlog timeout is exceeded
+ *
+ *   spark.dynamicAllocation.executorIdleTimeout (K) -
+ *     If an executor has been idle for this duration, remove it
+ */
+private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging {
+  import ExecutorAllocationManager._
+
+  private val conf = sc.conf
+
+  // Lower and upper bounds on the number of executors. These are required.
+  private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
+  private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
+  verifyBounds()
+
+  // How long there must be backlogged tasks for before an addition is triggered
+  private val schedulerBacklogTimeout = conf.getLong(
+    "spark.dynamicAllocation.schedulerBacklogTimeout", 60)
+
+  // Same as above, but used only after `schedulerBacklogTimeout` is exceeded
+  private val sustainedSchedulerBacklogTimeout = conf.getLong(
+    "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
+
+  // How long an executor must be idle for before it is removed
+  private val removeThresholdSeconds = conf.getLong(
+    "spark.dynamicAllocation.executorIdleTimeout", 600)
+
+  // Number of executors to add in the next round
+  private var numExecutorsToAdd = 1
+
+  // Number of executors that have been requested but have not registered yet
+  private var numExecutorsPending = 0
+
+  // Executors that have been requested to be removed but have not been killed yet
+  private val executorsPendingToRemove = new mutable.HashSet[String]
+
+  // All known executors
+  private val executorIds = new mutable.HashSet[String]
+
+  // A timestamp of when an addition should be triggered, or NOT_SET if it is not set
+  // This is set when pending tasks are added but not scheduled yet
+  private var addTime: Long = NOT_SET
+
+  // A timestamp for each executor of when the executor should be removed, indexed by the ID
+  // This is set when an executor is no longer running a task, or when it first registers
+  private val removeTimes = new mutable.HashMap[String, Long]
+
+  // Polling loop interval (ms)
+  private val intervalMillis: Long = 100
+
+  // Whether we are testing this class. This should only be used internally.
+  private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
+
+  // Clock used to schedule when executors should be added and removed
+  private var clock: Clock = new RealClock
+
+  /**
+   * Verify that the lower and upper bounds on the number of executors are valid.
+   * If not, throw an appropriate exception.
+   */
+  private def verifyBounds(): Unit = {
+    if (minNumExecutors < 0 || maxNumExecutors < 0) {
+      throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
+    }
+    if (minNumExecutors == 0 || maxNumExecutors == 0) {
+      throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
+    }
+    if (minNumExecutors > maxNumExecutors) {
+      throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
+        s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
+    }
+  }
+
+  /**
+   * Use a different clock for this allocation manager. This is mainly used for testing.
+   */
+  def setClock(newClock: Clock): Unit = {
+    clock = newClock
+  }
+
+  /**
+   * Register for scheduler callbacks to decide when to add and remove executors.
+   */
+  def start(): Unit = {
+    val listener = new ExecutorAllocationListener(this)
+    sc.addSparkListener(listener)
+    startPolling()
+  }
+
+  /**
+   * Start the main polling thread that keeps track of when to add and remove executors.
+   */
+  private def startPolling(): Unit = {
+    val t = new Thread {
+      override def run(): Unit = {
+        while (true) {
+          try {
+            schedule()
+          } catch {
+            case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
+          }
+          Thread.sleep(intervalMillis)
+        }
+      }
+    }
+    t.setName("spark-dynamic-executor-allocation")
+    t.setDaemon(true)
+    t.start()
+  }
+
+  /**
+   * If the add time has expired, request new executors and refresh the add time.
+   * If the remove time for an existing executor has expired, kill the executor.
+   * This is factored out into its own method for testing.
+   */
+  private def schedule(): Unit = synchronized {
+    val now = clock.getTimeMillis
+    if (addTime != NOT_SET && now >= addTime) {
+      addExecutors()
+      logDebug(s"Starting timer to add more executors (to " +
+        s"expire in $sustainedSchedulerBacklogTimeout seconds)")
+      addTime += sustainedSchedulerBacklogTimeout * 1000
+    }
+
+    removeTimes.foreach { case (executorId, expireTime) =>
+      if (now >= expireTime) {
+        removeExecutor(executorId)
+        removeTimes.remove(executorId)
+      }
+    }
+  }
+
+  /**
+   * Request a number of executors from the cluster manager.
+   * If the cap on the number of executors is reached, give up and reset the
+   * number of executors to add next round instead of continuing to double it.
+   * Return the number actually requested.
+   */
+  private def addExecutors(): Int = synchronized {
+    // Do not request more executors if we have already reached the upper bound
+    val numExistingExecutors = executorIds.size + numExecutorsPending
+    if (numExistingExecutors >= maxNumExecutors) {
+      logDebug(s"Not adding executors because there are already ${executorIds.size} " +
+        s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
+      numExecutorsToAdd = 1
+      return 0
+    }
+
+    // Request executors with respect to the upper bound
+    val actualNumExecutorsToAdd =
+      if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) {
+        numExecutorsToAdd
+      } else {
+        maxNumExecutors - numExistingExecutors
+      }
+    val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
+    val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
+    if (addRequestAcknowledged) {
+      logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
+        s"tasks are backlogged (new desired total will be $newTotalExecutors)")
+      numExecutorsToAdd =
+        if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1
+      numExecutorsPending += actualNumExecutorsToAdd
+      actualNumExecutorsToAdd
+    } else {
+      logWarning(s"Unable to reach the cluster manager " +
+        s"to request $actualNumExecutorsToAdd executors!")
+      0
+    }
+  }
+
+  /**
+   * Request the cluster manager to remove the given executor.
+   * Return whether the request is received.
+   */
+  private def removeExecutor(executorId: String): Boolean = synchronized {
+    // Do not kill the executor if we are not aware of it (should never happen)
+    if (!executorIds.contains(executorId)) {
+      logWarning(s"Attempted to remove unknown executor $executorId!")
+      return false
+    }
+
+    // Do not kill the executor again if it is already pending to be killed (should never happen)
+    if (executorsPendingToRemove.contains(executorId)) {
+      logWarning(s"Attempted to remove executor $executorId " +
+        s"when it is already pending to be removed!")
+      return false
+    }
+
+    // Do not kill the executor if we have already reached the lower bound
+    val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
+    if (numExistingExecutors - 1 < minNumExecutors) {
+      logInfo(s"Not removing idle executor $executorId because there are only " +
+        s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
+      return false
+    }
+
+    // Send a request to the backend to kill this executor
+    val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
+    if (removeRequestAcknowledged) {
+      logInfo(s"Removing executor $executorId because it has been idle for " +
+        s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
+      executorsPendingToRemove.add(executorId)
+      true
+    } else {
+      logWarning(s"Unable to reach the cluster manager to kill executor $executorId!")
+      false
+    }
+  }
+
+  /**
+   * Callback invoked when the specified executor has been added.
+   */
+  private def onExecutorAdded(executorId: String): Unit = synchronized {
+    if (!executorIds.contains(executorId)) {
+      executorIds.add(executorId)
+      executorIds.foreach(onExecutorIdle)
+      logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
+      if (numExecutorsPending > 0) {
+        numExecutorsPending -= 1
+        logDebug(s"Decremented number of pending executors ($numExecutorsPending left)")
+      }
+    } else {
+      logWarning(s"Duplicate executor $executorId has registered")
+    }
+  }
+
+  /**
+   * Callback invoked when the specified executor has been removed.
+   */
+  private def onExecutorRemoved(executorId: String): Unit = synchronized {
+    if (executorIds.contains(executorId)) {
+      executorIds.remove(executorId)
+      removeTimes.remove(executorId)
+      logInfo(s"Existing executor $executorId has been removed (new total is ${executorIds.size})")
+      if (executorsPendingToRemove.contains(executorId)) {
+        executorsPendingToRemove.remove(executorId)
+        logDebug(s"Executor $executorId is no longer pending to " +
+          s"be removed (${executorsPendingToRemove.size} left)")
+      }
+    } else {
+      logWarning(s"Unknown executor $executorId has been removed!")
+    }
+  }
+
+  /**
+   * Callback invoked when the scheduler receives new pending tasks.
+   * This sets a time in the future that decides when executors should be added
+   * if it is not already set.
+   */
+  private def onSchedulerBacklogged(): Unit = synchronized {
+    if (addTime == NOT_SET) {
+      logDebug(s"Starting timer to add executors because pending tasks " +
+        s"are building up (to expire in $schedulerBacklogTimeout seconds)")
+      addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
+    }
+  }
+
+  /**
+   * Callback invoked when the scheduler queue is drained.
+   * This resets all variables used for adding executors.
+   */
+  private def onSchedulerQueueEmpty(): Unit = synchronized {
+    logDebug(s"Clearing timer to add executors because there are no more pending tasks")
+    addTime = NOT_SET
+    numExecutorsToAdd = 1
+  }
+
+  /**
+   * Callback invoked when the specified executor is no longer running any tasks.
+   * This sets a time in the future that decides when this executor should be removed if
+   * the executor is not already marked as idle.
+   */
+  private def onExecutorIdle(executorId: String): Unit = synchronized {
+    if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
+      logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
+        s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
+      removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
+    }
+  }
+
+  /**
+   * Callback invoked when the specified executor is now running a task.
+   * This resets all variables used for removing this executor.
+   */
+  private def onExecutorBusy(executorId: String): Unit = synchronized {
+    logDebug(s"Clearing idle timer for $executorId because it is now running a task")
+    removeTimes.remove(executorId)
+  }
+
+  /**
+   * A listener that notifies the given allocation manager of when to add and remove executors.
+   *
+   * This class is intentionally conservative in its assumptions about the relative ordering
+   * and consistency of events returned by the listener. For simplicity, it does not account
+   * for speculated tasks.
+   */
+  private class ExecutorAllocationListener(allocationManager: ExecutorAllocationManager)
+    extends SparkListener {
+
+    private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
+    private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
+    private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
+
+    override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
+      synchronized {
+        val stageId = stageSubmitted.stageInfo.stageId
+        val numTasks = stageSubmitted.stageInfo.numTasks
+        stageIdToNumTasks(stageId) = numTasks
+        allocationManager.onSchedulerBacklogged()
+      }
+    }
+
+    override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
+      synchronized {
+        val stageId = stageCompleted.stageInfo.stageId
+        stageIdToNumTasks -= stageId
+        stageIdToTaskIndices -= stageId
+
+        // If this is the last stage with pending tasks, mark the scheduler queue as empty
+        // This is needed in case the stage is aborted for any reason
+        if (stageIdToNumTasks.isEmpty) {
+          allocationManager.onSchedulerQueueEmpty()
+        }
+      }
+    }
+
+    override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
+      val stageId = taskStart.stageId
+      val taskId = taskStart.taskInfo.taskId
+      val taskIndex = taskStart.taskInfo.index
+      val executorId = taskStart.taskInfo.executorId
+
+      // If this is the last pending task, mark the scheduler queue as empty
+      stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
+      val numTasksScheduled = stageIdToTaskIndices(stageId).size
+      val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
+      if (numTasksScheduled == numTasksTotal) {
+        // No more pending tasks for this stage
+        stageIdToNumTasks -= stageId
+        if (stageIdToNumTasks.isEmpty) {
+          allocationManager.onSchedulerQueueEmpty()
+        }
+      }
+
+      // Mark the executor on which this task is scheduled as busy
+      executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
+      allocationManager.onExecutorBusy(executorId)
+    }
+
+    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+      val executorId = taskEnd.taskInfo.executorId
+      val taskId = taskEnd.taskInfo.taskId
+
+      // If the executor is no longer running scheduled any tasks, mark it as idle
+      if (executorIdToTaskIds.contains(executorId)) {
+        executorIdToTaskIds(executorId) -= taskId
+        if (executorIdToTaskIds(executorId).isEmpty) {
+          executorIdToTaskIds -= executorId
+          allocationManager.onExecutorIdle(executorId)
+        }
+      }
+    }
+
+    override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
+      val executorId = blockManagerAdded.blockManagerId.executorId
+      if (executorId != "<driver>") {
+        allocationManager.onExecutorAdded(executorId)
+      }
+    }
+
+    override def onBlockManagerRemoved(
+        blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
+      allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
+    }
+  }
+
+}
+
+private object ExecutorAllocationManager {
+  val NOT_SET = Long.MaxValue
+}
+
+/**
+ * An abstract clock for measuring elapsed time.
+ */
+private trait Clock {
+  def getTimeMillis: Long
+}
+
+/**
+ * A clock backed by a monotonically increasing time source.
+ * The time returned by this clock does not correspond to any notion of wall-clock time.
+ */
+private class RealClock extends Clock {
+  override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
+}
+
+/**
+ * A clock that allows the caller to customize the time.
+ * This is used mainly for testing.
+ */
+private class TestClock(startTimeMillis: Long) extends Clock {
+  private var time: Long = startTimeMillis
+  override def getTimeMillis: Long = time
+  def tick(ms: Long): Unit = { time += ms }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8d59b37b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 40ea369..73668e8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -330,6 +330,15 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
     } else None
   }
 
+  // Optionally scale number of executors dynamically based on workload. Exposed for testing.
+  private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
+    if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+      Some(new ExecutorAllocationManager(this))
+    } else {
+      None
+    }
+  executorAllocationManager.foreach(_.start())
+
   // At this point, all relevant SparkListeners have been registered, so begin releasing events
   listenerBus.start()
 
@@ -860,36 +869,42 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
   /**
    * :: DeveloperApi ::
    * Request an additional number of executors from the cluster manager.
-   * This is currently only supported in Yarn mode.
+   * This is currently only supported in Yarn mode. Return whether the request is received.
    */
   @DeveloperApi
-  def requestExecutors(numAdditionalExecutors: Int): Unit = {
+  def requestExecutors(numAdditionalExecutors: Int): Boolean = {
     schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors)
-      case _ => logWarning("Requesting executors is only supported in coarse-grained mode")
+      case b: CoarseGrainedSchedulerBackend =>
+        b.requestExecutors(numAdditionalExecutors)
+      case _ =>
+        logWarning("Requesting executors is only supported in coarse-grained mode")
+        false
     }
   }
 
   /**
    * :: DeveloperApi ::
    * Request that the cluster manager kill the specified executors.
-   * This is currently only supported in Yarn mode.
+   * This is currently only supported in Yarn mode. Return whether the request is received.
    */
   @DeveloperApi
-  def killExecutors(executorIds: Seq[String]): Unit = {
+  def killExecutors(executorIds: Seq[String]): Boolean = {
     schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds)
-      case _ => logWarning("Killing executors is only supported in coarse-grained mode")
+      case b: CoarseGrainedSchedulerBackend =>
+        b.killExecutors(executorIds)
+      case _ =>
+        logWarning("Killing executors is only supported in coarse-grained mode")
+        false
     }
   }
 
   /**
    * :: DeveloperApi ::
    * Request that cluster manager the kill the specified executor.
-   * This is currently only supported in Yarn mode.
+   * This is currently only supported in Yarn mode. Return whether the request is received.
    */
   @DeveloperApi
-  def killExecutor(executorId: String): Unit = killExecutors(Seq(executorId))
+  def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
 
   /** The version of Spark on which this application is running. */
   def version = SPARK_VERSION

http://git-wip-us.apache.org/repos/asf/spark/blob/8d59b37b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
new file mode 100644
index 0000000..f0aa914
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.{FunSuite, PrivateMethodTester}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.BlockManagerId
+
+/**
+ * Test add and remove behavior of ExecutorAllocationManager.
+ */
+class ExecutorAllocationManagerSuite extends FunSuite {
+  import ExecutorAllocationManager._
+  import ExecutorAllocationManagerSuite._
+
+  test("verify min/max executors") {
+    // No min or max
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test-executor-allocation-manager")
+      .set("spark.dynamicAllocation.enabled", "true")
+    intercept[SparkException] { new SparkContext(conf) }
+
+    // Only min
+    val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
+    intercept[SparkException] { new SparkContext(conf1) }
+
+    // Only max
+    val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
+    intercept[SparkException] { new SparkContext(conf2) }
+
+    // Both min and max, but min > max
+    intercept[SparkException] { createSparkContext(2, 1) }
+
+    // Both min and max, and min == max
+    val sc1 = createSparkContext(1, 1)
+    assert(sc1.executorAllocationManager.isDefined)
+    sc1.stop()
+
+    // Both min and max, and min < max
+    val sc2 = createSparkContext(1, 2)
+    assert(sc2.executorAllocationManager.isDefined)
+    sc2.stop()
+  }
+
+  test("starting state") {
+    val sc = createSparkContext()
+    val manager = sc.executorAllocationManager.get
+    assert(numExecutorsPending(manager) === 0)
+    assert(executorsPendingToRemove(manager).isEmpty)
+    assert(executorIds(manager).isEmpty)
+    assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
+    assert(removeTimes(manager).isEmpty)
+    sc.stop()
+  }
+
+  test("add executors") {
+    val sc = createSparkContext(1, 10)
+    val manager = sc.executorAllocationManager.get
+
+    // Keep adding until the limit is reached
+    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsPending(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 2)
+    assert(addExecutors(manager) === 2)
+    assert(numExecutorsPending(manager) === 3)
+    assert(numExecutorsToAdd(manager) === 4)
+    assert(addExecutors(manager) === 4)
+    assert(numExecutorsPending(manager) === 7)
+    assert(numExecutorsToAdd(manager) === 8)
+    assert(addExecutors(manager) === 3) // reached the limit of 10
+    assert(numExecutorsPending(manager) === 10)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(addExecutors(manager) === 0)
+    assert(numExecutorsPending(manager) === 10)
+    assert(numExecutorsToAdd(manager) === 1)
+
+    // Register previously requested executors
+    onExecutorAdded(manager, "first")
+    assert(numExecutorsPending(manager) === 9)
+    onExecutorAdded(manager, "second")
+    onExecutorAdded(manager, "third")
+    onExecutorAdded(manager, "fourth")
+    assert(numExecutorsPending(manager) === 6)
+    onExecutorAdded(manager, "first") // duplicates should not count
+    onExecutorAdded(manager, "second")
+    assert(numExecutorsPending(manager) === 6)
+
+    // Try adding again
+    // This should still fail because the number pending + running is still at the limit
+    assert(addExecutors(manager) === 0)
+    assert(numExecutorsPending(manager) === 6)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(addExecutors(manager) === 0)
+    assert(numExecutorsPending(manager) === 6)
+    assert(numExecutorsToAdd(manager) === 1)
+    sc.stop()
+  }
+
+  test("remove executors") {
+    val sc = createSparkContext(5, 10)
+    val manager = sc.executorAllocationManager.get
+    (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
+
+    // Keep removing until the limit is reached
+    assert(executorsPendingToRemove(manager).isEmpty)
+    assert(removeExecutor(manager, "1"))
+    assert(executorsPendingToRemove(manager).size === 1)
+    assert(executorsPendingToRemove(manager).contains("1"))
+    assert(removeExecutor(manager, "2"))
+    assert(removeExecutor(manager, "3"))
+    assert(executorsPendingToRemove(manager).size === 3)
+    assert(executorsPendingToRemove(manager).contains("2"))
+    assert(executorsPendingToRemove(manager).contains("3"))
+    assert(!removeExecutor(manager, "100")) // remove non-existent executors
+    assert(!removeExecutor(manager, "101"))
+    assert(executorsPendingToRemove(manager).size === 3)
+    assert(removeExecutor(manager, "4"))
+    assert(removeExecutor(manager, "5"))
+    assert(!removeExecutor(manager, "6")) // reached the limit of 5
+    assert(executorsPendingToRemove(manager).size === 5)
+    assert(executorsPendingToRemove(manager).contains("4"))
+    assert(executorsPendingToRemove(manager).contains("5"))
+    assert(!executorsPendingToRemove(manager).contains("6"))
+
+    // Kill executors previously requested to remove
+    onExecutorRemoved(manager, "1")
+    assert(executorsPendingToRemove(manager).size === 4)
+    assert(!executorsPendingToRemove(manager).contains("1"))
+    onExecutorRemoved(manager, "2")
+    onExecutorRemoved(manager, "3")
+    assert(executorsPendingToRemove(manager).size === 2)
+    assert(!executorsPendingToRemove(manager).contains("2"))
+    assert(!executorsPendingToRemove(manager).contains("3"))
+    onExecutorRemoved(manager, "2") // duplicates should not count
+    onExecutorRemoved(manager, "3")
+    assert(executorsPendingToRemove(manager).size === 2)
+    onExecutorRemoved(manager, "4")
+    onExecutorRemoved(manager, "5")
+    assert(executorsPendingToRemove(manager).isEmpty)
+
+    // Try removing again
+    // This should still fail because the number pending + running is still at the limit
+    assert(!removeExecutor(manager, "7"))
+    assert(executorsPendingToRemove(manager).isEmpty)
+    assert(!removeExecutor(manager, "8"))
+    assert(executorsPendingToRemove(manager).isEmpty)
+    sc.stop()
+  }
+
+  test ("interleaving add and remove") {
+    val sc = createSparkContext(5, 10)
+    val manager = sc.executorAllocationManager.get
+
+    // Add a few executors
+    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 2)
+    assert(addExecutors(manager) === 4)
+    onExecutorAdded(manager, "1")
+    onExecutorAdded(manager, "2")
+    onExecutorAdded(manager, "3")
+    onExecutorAdded(manager, "4")
+    onExecutorAdded(manager, "5")
+    onExecutorAdded(manager, "6")
+    onExecutorAdded(manager, "7")
+    assert(executorIds(manager).size === 7)
+
+    // Remove until limit
+    assert(removeExecutor(manager, "1"))
+    assert(removeExecutor(manager, "2"))
+    assert(!removeExecutor(manager, "3")) // lower limit reached
+    assert(!removeExecutor(manager, "4"))
+    onExecutorRemoved(manager, "1")
+    onExecutorRemoved(manager, "2")
+    assert(executorIds(manager).size === 5)
+
+    // Add until limit
+    assert(addExecutors(manager) === 5) // upper limit reached
+    assert(addExecutors(manager) === 0)
+    assert(!removeExecutor(manager, "3")) // still at lower limit
+    assert(!removeExecutor(manager, "4"))
+    onExecutorAdded(manager, "8")
+    onExecutorAdded(manager, "9")
+    onExecutorAdded(manager, "10")
+    onExecutorAdded(manager, "11")
+    onExecutorAdded(manager, "12")
+    assert(executorIds(manager).size === 10)
+
+    // Remove succeeds again, now that we are no longer at the lower limit
+    assert(removeExecutor(manager, "3"))
+    assert(removeExecutor(manager, "4"))
+    assert(removeExecutor(manager, "5"))
+    assert(removeExecutor(manager, "6"))
+    assert(executorIds(manager).size === 10)
+    assert(addExecutors(manager) === 0) // still at upper limit
+    onExecutorRemoved(manager, "3")
+    onExecutorRemoved(manager, "4")
+    assert(executorIds(manager).size === 8)
+
+    // Add succeeds again, now that we are no longer at the upper limit
+    // Number of executors added restarts at 1
+    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 1) // upper limit reached again
+    assert(addExecutors(manager) === 0)
+    assert(executorIds(manager).size === 8)
+    onExecutorRemoved(manager, "5")
+    onExecutorRemoved(manager, "6")
+    onExecutorAdded(manager, "13")
+    onExecutorAdded(manager, "14")
+    assert(executorIds(manager).size === 8)
+    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 1) // upper limit reached again
+    assert(addExecutors(manager) === 0)
+    onExecutorAdded(manager, "15")
+    onExecutorAdded(manager, "16")
+    assert(executorIds(manager).size === 10)
+    sc.stop()
+  }
+
+  test("starting/canceling add timer") {
+    val sc = createSparkContext(2, 10)
+    val clock = new TestClock(8888L)
+    val manager = sc.executorAllocationManager.get
+    manager.setClock(clock)
+
+    // Starting add timer is idempotent
+    assert(addTime(manager) === NOT_SET)
+    onSchedulerBacklogged(manager)
+    val firstAddTime = addTime(manager)
+    assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
+    clock.tick(100L)
+    onSchedulerBacklogged(manager)
+    assert(addTime(manager) === firstAddTime) // timer is already started
+    clock.tick(200L)
+    onSchedulerBacklogged(manager)
+    assert(addTime(manager) === firstAddTime)
+    onSchedulerQueueEmpty(manager)
+
+    // Restart add timer
+    clock.tick(1000L)
+    assert(addTime(manager) === NOT_SET)
+    onSchedulerBacklogged(manager)
+    val secondAddTime = addTime(manager)
+    assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
+    clock.tick(100L)
+    onSchedulerBacklogged(manager)
+    assert(addTime(manager) === secondAddTime) // timer is already started
+    assert(addTime(manager) !== firstAddTime)
+    assert(firstAddTime !== secondAddTime)
+  }
+
+  test("starting/canceling remove timers") {
+    val sc = createSparkContext(2, 10)
+    val clock = new TestClock(14444L)
+    val manager = sc.executorAllocationManager.get
+    manager.setClock(clock)
+
+    // Starting remove timer is idempotent for each executor
+    assert(removeTimes(manager).isEmpty)
+    onExecutorIdle(manager, "1")
+    assert(removeTimes(manager).size === 1)
+    assert(removeTimes(manager).contains("1"))
+    val firstRemoveTime = removeTimes(manager)("1")
+    assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
+    clock.tick(100L)
+    onExecutorIdle(manager, "1")
+    assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already started
+    clock.tick(200L)
+    onExecutorIdle(manager, "1")
+    assert(removeTimes(manager)("1") === firstRemoveTime)
+    clock.tick(300L)
+    onExecutorIdle(manager, "2")
+    assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
+    assert(removeTimes(manager)("2") === clock.getTimeMillis + executorIdleTimeout * 1000)
+    clock.tick(400L)
+    onExecutorIdle(manager, "3")
+    assert(removeTimes(manager)("3") !== firstRemoveTime)
+    assert(removeTimes(manager)("3") === clock.getTimeMillis + executorIdleTimeout * 1000)
+    assert(removeTimes(manager).size === 3)
+    assert(removeTimes(manager).contains("2"))
+    assert(removeTimes(manager).contains("3"))
+
+    // Restart remove timer
+    clock.tick(1000L)
+    onExecutorBusy(manager, "1")
+    assert(removeTimes(manager).size === 2)
+    onExecutorIdle(manager, "1")
+    assert(removeTimes(manager).size === 3)
+    assert(removeTimes(manager).contains("1"))
+    val secondRemoveTime = removeTimes(manager)("1")
+    assert(secondRemoveTime === clock.getTimeMillis + executorIdleTimeout * 1000)
+    assert(removeTimes(manager)("1") === secondRemoveTime) // timer is already started
+    assert(removeTimes(manager)("1") !== firstRemoveTime)
+    assert(firstRemoveTime !== secondRemoveTime)
+  }
+
+  test("mock polling loop with no events") {
+    val sc = createSparkContext(1, 20)
+    val manager = sc.executorAllocationManager.get
+    val clock = new TestClock(2020L)
+    manager.setClock(clock)
+
+    // No events - we should not be adding or removing
+    assert(numExecutorsPending(manager) === 0)
+    assert(executorsPendingToRemove(manager).isEmpty)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 0)
+    assert(executorsPendingToRemove(manager).isEmpty)
+    clock.tick(100L)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 0)
+    assert(executorsPendingToRemove(manager).isEmpty)
+    clock.tick(1000L)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 0)
+    assert(executorsPendingToRemove(manager).isEmpty)
+    clock.tick(10000L)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 0)
+    assert(executorsPendingToRemove(manager).isEmpty)
+  }
+
+  test("mock polling loop add behavior") {
+    val sc = createSparkContext(1, 20)
+    val clock = new TestClock(2020L)
+    val manager = sc.executorAllocationManager.get
+    manager.setClock(clock)
+
+    // Scheduler queue backlogged
+    onSchedulerBacklogged(manager)
+    clock.tick(schedulerBacklogTimeout * 1000 / 2)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
+    clock.tick(schedulerBacklogTimeout * 1000)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 1) // first timer exceeded
+    clock.tick(sustainedSchedulerBacklogTimeout * 1000 / 2)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
+    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
+    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
+
+    // Scheduler queue drained
+    onSchedulerQueueEmpty(manager)
+    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 7) // timer is canceled
+    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 7)
+
+    // Scheduler queue backlogged again
+    onSchedulerBacklogged(manager)
+    clock.tick(schedulerBacklogTimeout * 1000)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
+    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 7 + 1 + 2)
+    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
+    clock.tick(sustainedSchedulerBacklogTimeout * 1000)
+    schedule(manager)
+    assert(numExecutorsPending(manager) === 20) // limit reached
+  }
+
+  test("mock polling loop remove behavior") {
+    val sc = createSparkContext(1, 20)
+    val clock = new TestClock(2020L)
+    val manager = sc.executorAllocationManager.get
+    manager.setClock(clock)
+
+    // Remove idle executors on timeout
+    onExecutorAdded(manager, "executor-1")
+    onExecutorAdded(manager, "executor-2")
+    onExecutorAdded(manager, "executor-3")
+    assert(removeTimes(manager).size === 3)
+    assert(executorsPendingToRemove(manager).isEmpty)
+    clock.tick(executorIdleTimeout * 1000 / 2)
+    schedule(manager)
+    assert(removeTimes(manager).size === 3) // idle threshold not reached yet
+    assert(executorsPendingToRemove(manager).isEmpty)
+    clock.tick(executorIdleTimeout * 1000)
+    schedule(manager)
+    assert(removeTimes(manager).isEmpty) // idle threshold exceeded
+    assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 executor remaining)
+
+    // Mark a subset as busy - only idle executors should be removed
+    onExecutorAdded(manager, "executor-4")
+    onExecutorAdded(manager, "executor-5")
+    onExecutorAdded(manager, "executor-6")
+    onExecutorAdded(manager, "executor-7")
+    assert(removeTimes(manager).size === 5)              // 5 active executors
+    assert(executorsPendingToRemove(manager).size === 2) // 2 pending to be removed
+    onExecutorBusy(manager, "executor-4")
+    onExecutorBusy(manager, "executor-5")
+    onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 active ones)
+    schedule(manager)
+    assert(removeTimes(manager).size === 2) // remove only idle executors
+    assert(!removeTimes(manager).contains("executor-4"))
+    assert(!removeTimes(manager).contains("executor-5"))
+    assert(!removeTimes(manager).contains("executor-6"))
+    assert(executorsPendingToRemove(manager).size === 2)
+    clock.tick(executorIdleTimeout * 1000)
+    schedule(manager)
+    assert(removeTimes(manager).isEmpty) // idle executors are removed
+    assert(executorsPendingToRemove(manager).size === 4)
+    assert(!executorsPendingToRemove(manager).contains("executor-4"))
+    assert(!executorsPendingToRemove(manager).contains("executor-5"))
+    assert(!executorsPendingToRemove(manager).contains("executor-6"))
+
+    // Busy executors are now idle and should be removed
+    onExecutorIdle(manager, "executor-4")
+    onExecutorIdle(manager, "executor-5")
+    onExecutorIdle(manager, "executor-6")
+    schedule(manager)
+    assert(removeTimes(manager).size === 3) // 0 busy and 3 idle
+    assert(removeTimes(manager).contains("executor-4"))
+    assert(removeTimes(manager).contains("executor-5"))
+    assert(removeTimes(manager).contains("executor-6"))
+    assert(executorsPendingToRemove(manager).size === 4)
+    clock.tick(executorIdleTimeout * 1000)
+    schedule(manager)
+    assert(removeTimes(manager).isEmpty)
+    assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining)
+  }
+
+  test("listeners trigger add executors correctly") {
+    val sc = createSparkContext(2, 10)
+    val manager = sc.executorAllocationManager.get
+    assert(addTime(manager) === NOT_SET)
+
+    // Starting a stage should start the add timer
+    val numTasks = 10
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks)))
+    assert(addTime(manager) !== NOT_SET)
+
+    // Starting a subset of the tasks should not cancel the add timer
+    val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, "executor-1") }
+    taskInfos.tail.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
+    assert(addTime(manager) !== NOT_SET)
+
+    // Starting all remaining tasks should cancel the add timer
+    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfos.head))
+    assert(addTime(manager) === NOT_SET)
+
+    // Start two different stages
+    // The add timer should be canceled only if all tasks in both stages start running
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks)))
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks)))
+    assert(addTime(manager) !== NOT_SET)
+    taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) }
+    assert(addTime(manager) !== NOT_SET)
+    taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, info)) }
+    assert(addTime(manager) === NOT_SET)
+  }
+
+  test("listeners trigger remove executors correctly") {
+    val sc = createSparkContext(2, 10)
+    val manager = sc.executorAllocationManager.get
+    assert(removeTimes(manager).isEmpty)
+
+    // Added executors should start the remove timers for each executor
+    (1 to 5).map("executor-" + _).foreach { id => onExecutorAdded(manager, id) }
+    assert(removeTimes(manager).size === 5)
+
+    // Starting a task cancel the remove timer for that executor
+    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
+    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1")))
+    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2")))
+    assert(removeTimes(manager).size === 3)
+    assert(!removeTimes(manager).contains("executor-1"))
+    assert(!removeTimes(manager).contains("executor-2"))
+
+    // Finishing all tasks running on an executor should start the remove timer for that executor
+    sc.listenerBus.postToAll(SparkListenerTaskEnd(
+      0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new TaskMetrics))
+    sc.listenerBus.postToAll(SparkListenerTaskEnd(
+      0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new TaskMetrics))
+    assert(removeTimes(manager).size === 4)
+    assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not finished yet
+    assert(removeTimes(manager).contains("executor-2"))
+    sc.listenerBus.postToAll(SparkListenerTaskEnd(
+      0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new TaskMetrics))
+    assert(removeTimes(manager).size === 5)
+    assert(removeTimes(manager).contains("executor-1")) // executor-1 has now finished
+  }
+
+  test("listeners trigger add and remove executor callbacks correctly") {
+    val sc = createSparkContext(2, 10)
+    val manager = sc.executorAllocationManager.get
+    assert(executorIds(manager).isEmpty)
+    assert(removeTimes(manager).isEmpty)
+
+    // New executors have registered
+    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+      0L, BlockManagerId("executor-1", "host1", 1), 100L))
+    assert(executorIds(manager).size === 1)
+    assert(executorIds(manager).contains("executor-1"))
+    assert(removeTimes(manager).size === 1)
+    assert(removeTimes(manager).contains("executor-1"))
+    sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
+      0L, BlockManagerId("executor-2", "host2", 1), 100L))
+    assert(executorIds(manager).size === 2)
+    assert(executorIds(manager).contains("executor-2"))
+    assert(removeTimes(manager).size === 2)
+    assert(removeTimes(manager).contains("executor-2"))
+
+    // Existing executors have disconnected
+    sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
+      0L, BlockManagerId("executor-1", "host1", 1)))
+    assert(executorIds(manager).size === 1)
+    assert(!executorIds(manager).contains("executor-1"))
+    assert(removeTimes(manager).size === 1)
+    assert(!removeTimes(manager).contains("executor-1"))
+
+    // Unknown executor has disconnected
+    sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
+      0L, BlockManagerId("executor-3", "host3", 1)))
+    assert(executorIds(manager).size === 1)
+    assert(removeTimes(manager).size === 1)
+  }
+
+}
+
+/**
+ * Helper methods for testing ExecutorAllocationManager.
+ * This includes methods to access private methods and fields in ExecutorAllocationManager.
+ */
+private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
+  private val schedulerBacklogTimeout = 1L
+  private val sustainedSchedulerBacklogTimeout = 2L
+  private val executorIdleTimeout = 3L
+
+  private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test-executor-allocation-manager")
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
+      .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
+      .set("spark.dynamicAllocation.schedulerBacklogTimeout", schedulerBacklogTimeout.toString)
+      .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
+        sustainedSchedulerBacklogTimeout.toString)
+      .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
+      .set("spark.dynamicAllocation.testing", "true")
+    new SparkContext(conf)
+  }
+
+  private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
+    new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
+  }
+
+  private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
+    new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false)
+  }
+
+  /* ------------------------------------------------------- *
+   | Helper methods for accessing private methods and fields |
+   * ------------------------------------------------------- */
+
+  private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
+  private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
+  private val _executorsPendingToRemove =
+    PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
+  private val _executorIds = PrivateMethod[collection.Set[String]]('executorIds)
+  private val _addTime = PrivateMethod[Long]('addTime)
+  private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
+  private val _schedule = PrivateMethod[Unit]('schedule)
+  private val _addExecutors = PrivateMethod[Int]('addExecutors)
+  private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
+  private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
+  private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
+  private val _onSchedulerBacklogged = PrivateMethod[Unit]('onSchedulerBacklogged)
+  private val _onSchedulerQueueEmpty = PrivateMethod[Unit]('onSchedulerQueueEmpty)
+  private val _onExecutorIdle = PrivateMethod[Unit]('onExecutorIdle)
+  private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
+
+  private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _numExecutorsToAdd()
+  }
+
+  private def numExecutorsPending(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _numExecutorsPending()
+  }
+
+  private def executorsPendingToRemove(
+      manager: ExecutorAllocationManager): collection.Set[String] = {
+    manager invokePrivate _executorsPendingToRemove()
+  }
+
+  private def executorIds(manager: ExecutorAllocationManager): collection.Set[String] = {
+    manager invokePrivate _executorIds()
+  }
+
+  private def addTime(manager: ExecutorAllocationManager): Long = {
+    manager invokePrivate _addTime()
+  }
+
+  private def removeTimes(manager: ExecutorAllocationManager): collection.Map[String, Long] = {
+    manager invokePrivate _removeTimes()
+  }
+
+  private def schedule(manager: ExecutorAllocationManager): Unit = {
+    manager invokePrivate _schedule()
+  }
+
+  private def addExecutors(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _addExecutors()
+  }
+
+  private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {
+    manager invokePrivate _removeExecutor(id)
+  }
+
+  private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = {
+    manager invokePrivate _onExecutorAdded(id)
+  }
+
+  private def onExecutorRemoved(manager: ExecutorAllocationManager, id: String): Unit = {
+    manager invokePrivate _onExecutorRemoved(id)
+  }
+
+  private def onSchedulerBacklogged(manager: ExecutorAllocationManager): Unit = {
+    manager invokePrivate _onSchedulerBacklogged()
+  }
+
+  private def onSchedulerQueueEmpty(manager: ExecutorAllocationManager): Unit = {
+    manager invokePrivate _onSchedulerQueueEmpty()
+  }
+
+  private def onExecutorIdle(manager: ExecutorAllocationManager, id: String): Unit = {
+    manager invokePrivate _onExecutorIdle(id)
+  }
+
+  private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): Unit = {
+    manager invokePrivate _onExecutorBusy(id)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8d59b37b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 6807379..e90672c 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -505,7 +505,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         driver ! x
 
       case RequestExecutors(requestedTotal) =>
-        logInfo(s"Driver requested a total number of executors of $requestedTotal.")
+        logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
         Option(allocator) match {
           case Some(a) => a.requestTotalExecutors(requestedTotal)
           case None => logWarning("Container allocator is not ready to request executors yet.")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org