You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/02 03:33:22 UTC

spark git commit: [SPARK-6954] [YARN] ExecutorAllocationManager can end up requesting a negative n...

Repository: spark
Updated Branches:
  refs/heads/master ae98eec73 -> 099327d53


[SPARK-6954] [YARN] ExecutorAllocationManager can end up requesting a negative n...

...umber of executors

Author: Sandy Ryza <sa...@cloudera.com>

Closes #5704 from sryza/sandy-spark-6954 and squashes the following commits:

b7890fb [Sandy Ryza] Avoid ramping up to an existing number of executors
6eb516a [Sandy Ryza] SPARK-6954. ExecutorAllocationManager can end up requesting a negative number of executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/099327d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/099327d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/099327d5

Branch: refs/heads/master
Commit: 099327d5376554134c9af49bc2045add4cfb024d
Parents: ae98eec
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Fri May 1 18:32:46 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri May 1 18:33:15 2015 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 101 ++++++-----
 .../spark/ExecutorAllocationManagerSuite.scala  | 171 +++++++++++--------
 2 files changed, 148 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/099327d5/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b986fa8..228d914 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -27,13 +27,20 @@ import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
 /**
  * An agent that dynamically allocates and removes executors based on the workload.
  *
- * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
- * the scheduler queue is not drained in N seconds, then new executors are added. If the queue
- * persists for another M seconds, then more executors are added and so on. The number added
- * in each round increases exponentially from the previous round until an upper bound on the
- * number of executors has been reached. The upper bound is based both on a configured property
- * and on the number of tasks pending: the policy will never increase the number of executor
- * requests past the number needed to handle all pending tasks.
+ * The ExecutorAllocationManager maintains a moving target number of executors which is periodically
+ * synced to the cluster manager. The target starts at a configured initial value and changes with
+ * the number of pending and running tasks.
+ *
+ * Decreasing the target number of executors happens when the current target is more than needed to
+ * handle the current load. The target number of executors is always truncated to the number of
+ * executors that could run all current running and pending tasks at once.
+ *
+ * Increasing the target number of executors happens in response to backlogged tasks waiting to be
+ * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
+ * the queue persists for another M seconds, then more executors are added and so on. The number
+ * added in each round increases exponentially from the previous round until an upper bound has been
+ * reached. The upper bound is based both on a configured property and on the current number of
+ * running and pending tasks, as described above.
  *
  * The rationale for the exponential increase is twofold: (1) Executors should be added slowly
  * in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager(
   // Number of executors to add in the next round
   private var numExecutorsToAdd = 1
 
-  // Number of executors that have been requested but have not registered yet
-  private var numExecutorsPending = 0
+  // The desired number of executors at this moment in time. If all our executors were to die, this
+  // is the number of executors we would immediately want from the cluster manager.
+  private var numExecutorsTarget =
+    conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
 
   // Executors that have been requested to be removed but have not been killed yet
   private val executorsPendingToRemove = new mutable.HashSet[String]
@@ -200,13 +209,6 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * The number of executors we would have if the cluster manager were to fulfill all our existing
-   * requests.
-   */
-  private def targetNumExecutors(): Int =
-    numExecutorsPending + executorIds.size - executorsPendingToRemove.size
-
-  /**
    * The maximum number of executors we would need under the current load to satisfy all running
    * and pending tasks, rounded up.
    */
@@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager(
   private def schedule(): Unit = synchronized {
     val now = clock.getTimeMillis
 
-    addOrCancelExecutorRequests(now)
+    updateAndSyncNumExecutorsTarget(now)
 
     removeTimes.retain { case (executorId, expireTime) =>
       val expired = now >= expireTime
@@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Updates our target number of executors and syncs the result with the cluster manager.
+   *
    * Check to see whether our existing allocation and the requests we've made previously exceed our
-   * current needs. If so, let the cluster manager know so that it can cancel pending requests that
-   * are unneeded.
+   * current needs. If so, truncate our target and let the cluster manager know so that it can
+   * cancel pending requests that are unneeded.
    *
    * If not, and the add time has expired, see if we can request new executors and refresh the add
    * time.
    *
    * @return the delta in the target number of executors.
    */
-  private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
-    val currentTarget = targetNumExecutors
+  private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
     val maxNeeded = maxNumExecutorsNeeded
 
-    if (maxNeeded < currentTarget) {
+    if (maxNeeded < numExecutorsTarget) {
       // The target number exceeds the number we actually need, so stop adding new
-      // executors and inform the cluster manager to cancel the extra pending requests.
-      val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
-      client.requestTotalExecutors(newTotalExecutors)
+      // executors and inform the cluster manager to cancel the extra pending requests
+      val oldNumExecutorsTarget = numExecutorsTarget
+      numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
+      client.requestTotalExecutors(numExecutorsTarget)
       numExecutorsToAdd = 1
-      updateNumExecutorsPending(newTotalExecutors)
+      numExecutorsTarget - oldNumExecutorsTarget
     } else if (addTime != NOT_SET && now >= addTime) {
       val delta = addExecutors(maxNeeded)
       logDebug(s"Starting timer to add more executors (to " +
@@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager(
    */
   private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
     // Do not request more executors if it would put our target over the upper bound
-    val currentTarget = targetNumExecutors
-    if (currentTarget >= maxNumExecutors) {
-      logDebug(s"Not adding executors because there are already ${executorIds.size} " +
-        s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
+    if (numExecutorsTarget >= maxNumExecutors) {
+      val numExecutorsPending = numExecutorsTarget - executorIds.size
+      logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
+        s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
       numExecutorsToAdd = 1
       return 0
     }
 
-    val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
-    val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
-    val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
+    val oldNumExecutorsTarget = numExecutorsTarget
+    // There's no point in wasting time ramping up to the number of executors we already have, so
+    // make sure our target is at least as much as our current allocation:
+    numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
+    // Boost our target with the number to add for this round:
+    numExecutorsTarget += numExecutorsToAdd
+    // Ensure that our target doesn't exceed what we need at the present moment:
+    numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
+    // Ensure that our target fits within configured bounds:
+    numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
+
+    val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
     if (addRequestAcknowledged) {
-      val delta = updateNumExecutorsPending(newTotalExecutors)
+      val delta = numExecutorsTarget - oldNumExecutorsTarget
       logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
-        s" (new desired total will be $newTotalExecutors)")
+        s" (new desired total will be $numExecutorsTarget)")
       numExecutorsToAdd = if (delta == numExecutorsToAdd) {
         numExecutorsToAdd * 2
       } else {
@@ -304,24 +317,12 @@ private[spark] class ExecutorAllocationManager(
       delta
     } else {
       logWarning(
-        s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
+        s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
       0
     }
   }
 
   /**
-   * Given the new target number of executors, update the number of pending executor requests,
-   * and return the delta from the old number of pending requests.
-   */
-  private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
-    val newNumExecutorsPending =
-      newTotalExecutors - executorIds.size + executorsPendingToRemove.size
-    val delta = newNumExecutorsPending - numExecutorsPending
-    numExecutorsPending = newNumExecutorsPending
-    delta
-  }
-
-  /**
    * Request the cluster manager to remove the given executor.
    * Return whether the request is received.
    */
@@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager(
       // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
       executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
       logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
-      if (numExecutorsPending > 0) {
-        numExecutorsPending -= 1
-        logDebug(s"Decremented number of pending executors ($numExecutorsPending left)")
-      }
     } else {
       logWarning(s"Duplicate executor $executorId has registered")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/099327d5/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 22acc27..49e6de4 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -78,7 +78,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
   test("starting state") {
     sc = createSparkContext()
     val manager = sc.executorAllocationManager.get
-    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsTarget(manager) === 1)
     assert(executorsPendingToRemove(manager).isEmpty)
     assert(executorIds(manager).isEmpty)
     assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
@@ -91,108 +91,108 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Keep adding until the limit is reached
-    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsTarget(manager) === 1)
     assert(numExecutorsToAdd(manager) === 1)
     assert(addExecutors(manager) === 1)
-    assert(numExecutorsPending(manager) === 1)
+    assert(numExecutorsTarget(manager) === 2)
     assert(numExecutorsToAdd(manager) === 2)
     assert(addExecutors(manager) === 2)
-    assert(numExecutorsPending(manager) === 3)
+    assert(numExecutorsTarget(manager) === 4)
     assert(numExecutorsToAdd(manager) === 4)
     assert(addExecutors(manager) === 4)
-    assert(numExecutorsPending(manager) === 7)
+    assert(numExecutorsTarget(manager) === 8)
     assert(numExecutorsToAdd(manager) === 8)
-    assert(addExecutors(manager) === 3) // reached the limit of 10
-    assert(numExecutorsPending(manager) === 10)
+    assert(addExecutors(manager) === 2) // reached the limit of 10
+    assert(numExecutorsTarget(manager) === 10)
     assert(numExecutorsToAdd(manager) === 1)
     assert(addExecutors(manager) === 0)
-    assert(numExecutorsPending(manager) === 10)
+    assert(numExecutorsTarget(manager) === 10)
     assert(numExecutorsToAdd(manager) === 1)
 
     // Register previously requested executors
     onExecutorAdded(manager, "first")
-    assert(numExecutorsPending(manager) === 9)
+    assert(numExecutorsTarget(manager) === 10)
     onExecutorAdded(manager, "second")
     onExecutorAdded(manager, "third")
     onExecutorAdded(manager, "fourth")
-    assert(numExecutorsPending(manager) === 6)
+    assert(numExecutorsTarget(manager) === 10)
     onExecutorAdded(manager, "first") // duplicates should not count
     onExecutorAdded(manager, "second")
-    assert(numExecutorsPending(manager) === 6)
+    assert(numExecutorsTarget(manager) === 10)
 
     // Try adding again
     // This should still fail because the number pending + running is still at the limit
     assert(addExecutors(manager) === 0)
-    assert(numExecutorsPending(manager) === 6)
+    assert(numExecutorsTarget(manager) === 10)
     assert(numExecutorsToAdd(manager) === 1)
     assert(addExecutors(manager) === 0)
-    assert(numExecutorsPending(manager) === 6)
+    assert(numExecutorsTarget(manager) === 10)
     assert(numExecutorsToAdd(manager) === 1)
   }
 
   test("add executors capped by num pending tasks") {
-    sc = createSparkContext(1, 10)
+    sc = createSparkContext(0, 10)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
 
     // Verify that we're capped at number of tasks in the stage
-    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsTarget(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
     assert(addExecutors(manager) === 1)
-    assert(numExecutorsPending(manager) === 1)
+    assert(numExecutorsTarget(manager) === 1)
     assert(numExecutorsToAdd(manager) === 2)
     assert(addExecutors(manager) === 2)
-    assert(numExecutorsPending(manager) === 3)
+    assert(numExecutorsTarget(manager) === 3)
     assert(numExecutorsToAdd(manager) === 4)
     assert(addExecutors(manager) === 2)
-    assert(numExecutorsPending(manager) === 5)
+    assert(numExecutorsTarget(manager) === 5)
     assert(numExecutorsToAdd(manager) === 1)
 
-    // Verify that running a task reduces the cap
+    // Verify that running a task doesn't affect the target
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
     sc.listenerBus.postToAll(SparkListenerExecutorAdded(
       0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
     sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
-    assert(numExecutorsPending(manager) === 4)
+    assert(numExecutorsTarget(manager) === 5)
     assert(addExecutors(manager) === 1)
-    assert(numExecutorsPending(manager) === 5)
+    assert(numExecutorsTarget(manager) === 6)
     assert(numExecutorsToAdd(manager) === 2)
     assert(addExecutors(manager) === 2)
-    assert(numExecutorsPending(manager) === 7)
+    assert(numExecutorsTarget(manager) === 8)
     assert(numExecutorsToAdd(manager) === 4)
     assert(addExecutors(manager) === 0)
-    assert(numExecutorsPending(manager) === 7)
+    assert(numExecutorsTarget(manager) === 8)
     assert(numExecutorsToAdd(manager) === 1)
 
-    // Verify that re-running a task doesn't reduce the cap further
+    // Verify that re-running a task doesn't blow things up
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3)))
     sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
     sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
     assert(addExecutors(manager) === 1)
-    assert(numExecutorsPending(manager) === 8)
+    assert(numExecutorsTarget(manager) === 9)
     assert(numExecutorsToAdd(manager) === 2)
     assert(addExecutors(manager) === 1)
-    assert(numExecutorsPending(manager) === 9)
+    assert(numExecutorsTarget(manager) === 10)
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a task once we're at our limit doesn't blow things up
     sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
     assert(addExecutors(manager) === 0)
-    assert(numExecutorsPending(manager) === 9)
+    assert(numExecutorsTarget(manager) === 10)
   }
 
   test("cancel pending executors when no longer needed") {
-    sc = createSparkContext(1, 10)
+    sc = createSparkContext(0, 10)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
 
-    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsTarget(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
     assert(addExecutors(manager) === 1)
-    assert(numExecutorsPending(manager) === 1)
+    assert(numExecutorsTarget(manager) === 1)
     assert(numExecutorsToAdd(manager) === 2)
     assert(addExecutors(manager) === 2)
-    assert(numExecutorsPending(manager) === 3)
+    assert(numExecutorsTarget(manager) === 3)
 
     val task1Info = createTaskInfo(0, 0, "executor-1")
     sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))
@@ -266,7 +266,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
     // Add a few executors
     assert(addExecutors(manager) === 1)
     assert(addExecutors(manager) === 2)
-    assert(addExecutors(manager) === 4)
     onExecutorAdded(manager, "1")
     onExecutorAdded(manager, "2")
     onExecutorAdded(manager, "3")
@@ -274,55 +273,57 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
     onExecutorAdded(manager, "5")
     onExecutorAdded(manager, "6")
     onExecutorAdded(manager, "7")
-    assert(executorIds(manager).size === 7)
+    onExecutorAdded(manager, "8")
+    assert(executorIds(manager).size === 8)
 
     // Remove until limit
     assert(removeExecutor(manager, "1"))
     assert(removeExecutor(manager, "2"))
-    assert(!removeExecutor(manager, "3")) // lower limit reached
-    assert(!removeExecutor(manager, "4"))
+    assert(removeExecutor(manager, "3"))
+    assert(!removeExecutor(manager, "4")) // lower limit reached
+    assert(!removeExecutor(manager, "5"))
     onExecutorRemoved(manager, "1")
     onExecutorRemoved(manager, "2")
+    onExecutorRemoved(manager, "3")
     assert(executorIds(manager).size === 5)
 
     // Add until limit
-    assert(addExecutors(manager) === 5) // upper limit reached
+    assert(addExecutors(manager) === 2) // upper limit reached
     assert(addExecutors(manager) === 0)
-    assert(!removeExecutor(manager, "3")) // still at lower limit
-    assert(!removeExecutor(manager, "4"))
-    onExecutorAdded(manager, "8")
+    assert(!removeExecutor(manager, "4")) // still at lower limit
+    assert(!removeExecutor(manager, "5"))
     onExecutorAdded(manager, "9")
     onExecutorAdded(manager, "10")
     onExecutorAdded(manager, "11")
     onExecutorAdded(manager, "12")
+    onExecutorAdded(manager, "13")
     assert(executorIds(manager).size === 10)
 
     // Remove succeeds again, now that we are no longer at the lower limit
-    assert(removeExecutor(manager, "3"))
     assert(removeExecutor(manager, "4"))
     assert(removeExecutor(manager, "5"))
     assert(removeExecutor(manager, "6"))
+    assert(removeExecutor(manager, "7"))
     assert(executorIds(manager).size === 10)
-    assert(addExecutors(manager) === 1)
-    onExecutorRemoved(manager, "3")
+    assert(addExecutors(manager) === 0)
     onExecutorRemoved(manager, "4")
+    onExecutorRemoved(manager, "5")
     assert(executorIds(manager).size === 8)
 
-    // Add succeeds again, now that we are no longer at the upper limit
-    // Number of executors added restarts at 1
-    assert(addExecutors(manager) === 2)
-    assert(addExecutors(manager) === 1) // upper limit reached
+    // Number of executors pending restarts at 1
+    assert(numExecutorsToAdd(manager) === 1)
     assert(addExecutors(manager) === 0)
     assert(executorIds(manager).size === 8)
-    onExecutorRemoved(manager, "5")
     onExecutorRemoved(manager, "6")
-    onExecutorAdded(manager, "13")
+    onExecutorRemoved(manager, "7")
     onExecutorAdded(manager, "14")
+    onExecutorAdded(manager, "15")
     assert(executorIds(manager).size === 8)
     assert(addExecutors(manager) === 0) // still at upper limit
-    onExecutorAdded(manager, "15")
     onExecutorAdded(manager, "16")
+    onExecutorAdded(manager, "17")
     assert(executorIds(manager).size === 10)
+    assert(numExecutorsTarget(manager) === 10)
   }
 
   test("starting/canceling add timer") {
@@ -405,33 +406,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
   }
 
   test("mock polling loop with no events") {
-    sc = createSparkContext(1, 20)
+    sc = createSparkContext(0, 20)
     val manager = sc.executorAllocationManager.get
     val clock = new ManualClock(2020L)
     manager.setClock(clock)
 
     // No events - we should not be adding or removing
-    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsTarget(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsTarget(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     clock.advance(100L)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsTarget(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     clock.advance(1000L)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsTarget(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     clock.advance(10000L)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 0)
+    assert(numExecutorsTarget(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
   }
 
   test("mock polling loop add behavior") {
-    sc = createSparkContext(1, 20)
+    sc = createSparkContext(0, 20)
     val clock = new ManualClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -441,43 +442,43 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
     onSchedulerBacklogged(manager)
     clock.advance(schedulerBacklogTimeout * 1000 / 2)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
+    assert(numExecutorsTarget(manager) === 0) // timer not exceeded yet
     clock.advance(schedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 1) // first timer exceeded
+    assert(numExecutorsTarget(manager) === 1) // first timer exceeded
     clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
+    assert(numExecutorsTarget(manager) === 1) // second timer not exceeded yet
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
+    assert(numExecutorsTarget(manager) === 1 + 2) // second timer exceeded
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
+    assert(numExecutorsTarget(manager) === 1 + 2 + 4) // third timer exceeded
 
     // Scheduler queue drained
     onSchedulerQueueEmpty(manager)
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 7) // timer is canceled
+    assert(numExecutorsTarget(manager) === 7) // timer is canceled
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 7)
+    assert(numExecutorsTarget(manager) === 7)
 
     // Scheduler queue backlogged again
     onSchedulerBacklogged(manager)
     clock.advance(schedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
+    assert(numExecutorsTarget(manager) === 7 + 1) // timer restarted
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 7 + 1 + 2)
+    assert(numExecutorsTarget(manager) === 7 + 1 + 2)
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
+    assert(numExecutorsTarget(manager) === 7 + 1 + 2 + 4)
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsPending(manager) === 20) // limit reached
+    assert(numExecutorsTarget(manager) === 20) // limit reached
   }
 
   test("mock polling loop remove behavior") {
@@ -671,6 +672,31 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
     assert(!removeTimes(manager).contains("executor-1"))
   }
 
+  test("avoid ramp up when target < running executors") {
+    sc = createSparkContext(0, 100000)
+    val manager = sc.executorAllocationManager.get
+    val stage1 = createStageInfo(0, 1000)
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
+
+    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 2)
+    assert(addExecutors(manager) === 4)
+    assert(addExecutors(manager) === 8)
+    assert(numExecutorsTarget(manager) === 15)
+    (0 until 15).foreach { i =>
+      onExecutorAdded(manager, s"executor-$i")
+    }
+    assert(executorIds(manager).size === 15)
+    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
+
+    adjustRequestedExecutors(manager)
+    assert(numExecutorsTarget(manager) === 0)
+
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000)))
+    addExecutors(manager)
+    assert(numExecutorsTarget(manager) === 16)
+  }
+
   private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
     val conf = new SparkConf()
       .setMaster("local")
@@ -713,7 +739,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
    * ------------------------------------------------------- */
 
   private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
-  private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
+  private val _numExecutorsTarget = PrivateMethod[Int]('numExecutorsTarget)
   private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded)
   private val _executorsPendingToRemove =
     PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
@@ -722,7 +748,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
   private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
   private val _schedule = PrivateMethod[Unit]('schedule)
   private val _addExecutors = PrivateMethod[Int]('addExecutors)
-  private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests)
+  private val _updateAndSyncNumExecutorsTarget =
+    PrivateMethod[Int]('updateAndSyncNumExecutorsTarget)
   private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
   private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
   private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
@@ -735,8 +762,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
     manager invokePrivate _numExecutorsToAdd()
   }
 
-  private def numExecutorsPending(manager: ExecutorAllocationManager): Int = {
-    manager invokePrivate _numExecutorsPending()
+  private def numExecutorsTarget(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _numExecutorsTarget()
   }
 
   private def executorsPendingToRemove(
@@ -766,7 +793,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
   }
 
   private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
-    manager invokePrivate _addOrCancelExecutorRequests(0L)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(0L)
   }
 
   private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org