You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/07/23 17:34:08 UTC

[spark] branch master updated: [SPARK-31418][SCHEDULER] Request more executors in case of dynamic allocation is enabled and a task becomes unschedulable due to spark's blacklisting feature

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e7fb67c  [SPARK-31418][SCHEDULER] Request more executors in case of dynamic allocation is enabled and a task becomes unschedulable due to spark's blacklisting feature
e7fb67c is described below

commit e7fb67cd880511452b94c2077429868e72998c05
Author: Venkata krishnan Sowrirajan <vs...@linkedin.com>
AuthorDate: Thu Jul 23 12:33:22 2020 -0500

    [SPARK-31418][SCHEDULER] Request more executors in case of dynamic allocation is enabled and a task becomes unschedulable due to spark's blacklisting feature
    
    ### What changes were proposed in this pull request?
    In this change, when dynamic allocation is enabled instead of aborting immediately when there is an unschedulable taskset due to blacklisting, pass an event saying `SparkListenerUnschedulableTaskSetAdded` which will be handled by `ExecutorAllocationManager` and request more executors needed to schedule the unschedulable blacklisted tasks. Once the event is sent, we start the abortTimer similar to [SPARK-22148][SPARK-15815] to abort in the case when no new executors launched either due [...]
    
    ### Why are the changes needed?
    This is an improvement. In the case when dynamic allocation is enabled, this would request more executors to schedule the unschedulable tasks instead of aborting the stage without even retrying upto spark.task.maxFailures times (in some cases not retrying at all). This is a potential issue with respect to Spark's Fault tolerance.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests both in ExecutorAllocationManagerSuite and TaskSchedulerImplSuite
    
    Closes #28287 from venkata91/SPARK-31418.
    
    Authored-by: Venkata krishnan Sowrirajan <vs...@linkedin.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../org/apache/spark/SparkFirehoseListener.java    |  10 ++
 .../apache/spark/ExecutorAllocationManager.scala   |  55 ++++++-
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  38 +++++
 .../apache/spark/scheduler/DAGSchedulerEvent.scala |   8 +
 .../org/apache/spark/scheduler/SparkListener.scala |  30 ++++
 .../apache/spark/scheduler/SparkListenerBus.scala  |   4 +
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  55 +++++--
 .../spark/ExecutorAllocationManagerSuite.scala     | 175 ++++++++++++++++++++-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   |  36 +++++
 9 files changed, 392 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 579e7ff..c0e72b5 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -162,6 +162,16 @@ public class SparkFirehoseListener implements SparkListenerInterface {
     onEvent(speculativeTask);
   }
 
+  public void onUnschedulableTaskSetAdded(
+      SparkListenerUnschedulableTaskSetAdded unschedulableTaskSetAdded) {
+    onEvent(unschedulableTaskSetAdded);
+  }
+
+  public void onUnschedulableTaskSetRemoved(
+      SparkListenerUnschedulableTaskSetRemoved unschedulableTaskSetRemoved) {
+    onEvent(unschedulableTaskSetRemoved);
+  }
+
   @Override
   public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) {
     onEvent(event);
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 620a6fe..85409d5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -281,6 +281,7 @@ private[spark] class ExecutorAllocationManager(
   private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
     val pending = listener.totalPendingTasksPerResourceProfile(rpId)
     val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
+    val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
     val running = listener.totalRunningTasksPerResourceProfile(rpId)
     val numRunningOrPendingTasks = pending + running
     val rp = resourceProfileManager.resourceProfileFromId(rpId)
@@ -289,13 +290,27 @@ private[spark] class ExecutorAllocationManager(
       s" tasksperexecutor: $tasksPerExecutor")
     val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
       tasksPerExecutor).toInt
-    if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
+
+    val maxNeededWithSpeculationLocalityOffset =
+      if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
       // If we have pending speculative tasks and only need a single executor, allocate one more
       // to satisfy the locality requirements of speculation
       maxNeeded + 1
     } else {
       maxNeeded
     }
+
+    if (unschedulableTaskSets > 0) {
+      // Request additional executors to account for task sets having tasks that are unschedulable
+      // due to blacklisting when the active executor count has already reached the max needed
+      // which we would normally get.
+      val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio /
+        tasksPerExecutor).toInt
+      math.max(maxNeededWithSpeculationLocalityOffset,
+        executorMonitor.executorCountWithResourceProfile(rpId) + maxNeededForUnschedulables)
+    } else {
+      maxNeededWithSpeculationLocalityOffset
+    }
   }
 
   private def totalRunningTasksPerResourceProfile(id: Int): Int = synchronized {
@@ -622,6 +637,12 @@ private[spark] class ExecutorAllocationManager(
     private val resourceProfileIdToStageAttempt =
       new mutable.HashMap[Int, mutable.Set[StageAttempt]]
 
+    // Keep track of unschedulable task sets due to blacklisting. This is a Set of StageAttempt's
+    // because we'll only take the last unschedulable task in a taskset although there can be more.
+    // This is done in order to avoid costly loops in the scheduling.
+    // Check TaskSetManager#getCompletelyBlacklistedTaskIfAny for more details.
+    private val unschedulableTaskSets = new mutable.HashSet[StageAttempt]
+
     // stageAttempt to tuple (the number of task with locality preferences, a map where each pair
     // is a node and the number of tasks that would like to be scheduled on that node, and
     // the resource profile id) map,
@@ -789,6 +810,28 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
+    override def onUnschedulableTaskSetAdded(
+        unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = {
+      val stageId = unschedulableTaskSetAdded.stageId
+      val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
+      val stageAttempt = StageAttempt(stageId, stageAttemptId)
+      allocationManager.synchronized {
+        unschedulableTaskSets.add(stageAttempt)
+        allocationManager.onSchedulerBacklogged()
+      }
+    }
+
+    override def onUnschedulableTaskSetRemoved(
+        unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = {
+      val stageId = unschedulableTaskSetRemoved.stageId
+      val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId
+      val stageAttempt = StageAttempt(stageId, stageAttemptId)
+      allocationManager.synchronized {
+        // Clear unschedulableTaskSets since atleast one task becomes schedulable now
+        unschedulableTaskSets.remove(stageAttempt)
+      }
+    }
+
     /**
      * An estimate of the total number of pending tasks remaining for currently running stages. Does
      * not account for tasks which may have failed and been resubmitted.
@@ -829,6 +872,16 @@ private[spark] class ExecutorAllocationManager(
       numTotalTasks - numRunning
     }
 
+    /**
+     * Currently we only know when a task set has an unschedulable task, we don't know
+     * the exact number and since the allocation manager isn't tied closely with the scheduler,
+     * we use the number of tasks sets that are unschedulable as a heuristic to add more executors.
+     */
+    def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
+      val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
+      attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+    }
+
     def hasPendingTasks: Boolean = {
       hasPendingSpeculativeTasks || hasPendingRegularTasks
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 73c95d1..2503ae0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -332,6 +332,26 @@ private[spark] class DAGScheduler(
     eventProcessLoop.post(SpeculativeTaskSubmitted(task))
   }
 
+  /**
+   * Called by the TaskSetManager when a taskset becomes unschedulable due to blacklisting and
+   * dynamic allocation is enabled.
+   */
+  def unschedulableTaskSetAdded(
+      stageId: Int,
+      stageAttemptId: Int): Unit = {
+    eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId))
+  }
+
+  /**
+   * Called by the TaskSetManager when an unschedulable taskset becomes schedulable and dynamic
+   * allocation is enabled.
+   */
+  def unschedulableTaskSetRemoved(
+      stageId: Int,
+      stageAttemptId: Int): Unit = {
+    eventProcessLoop.post(UnschedulableTaskSetRemoved(stageId, stageAttemptId))
+  }
+
   private[scheduler]
   def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
     // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
@@ -1035,6 +1055,18 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId))
   }
 
+  private[scheduler] def handleUnschedulableTaskSetAdded(
+      stageId: Int,
+      stageAttemptId: Int): Unit = {
+    listenerBus.post(SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId))
+  }
+
+  private[scheduler] def handleUnschedulableTaskSetRemoved(
+      stageId: Int,
+      stageAttemptId: Int): Unit = {
+    listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId))
+  }
+
   private[scheduler] def handleTaskSetFailed(
       taskSet: TaskSet,
       reason: String,
@@ -2321,6 +2353,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case SpeculativeTaskSubmitted(task) =>
       dagScheduler.handleSpeculativeTaskSubmitted(task)
 
+    case UnschedulableTaskSetAdded(stageId, stageAttemptId) =>
+      dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId)
+
+    case UnschedulableTaskSetRemoved(stageId, stageAttemptId) =>
+      dagScheduler.handleUnschedulableTaskSetRemoved(stageId, stageAttemptId)
+
     case GettingResultEvent(taskInfo) =>
       dagScheduler.handleGetTaskResult(taskInfo)
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 78d4583..d226fe8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -97,3 +97,11 @@ private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
 private[scheduler]
 case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent
 
+private[scheduler]
+case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
+  extends DAGSchedulerEvent
+
+private[scheduler]
+case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int)
+  extends DAGSchedulerEvent
+
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 62d54f3..8119215 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -158,6 +158,16 @@ case class SparkListenerNodeUnblacklisted(time: Long, hostId: String)
   extends SparkListenerEvent
 
 @DeveloperApi
+case class SparkListenerUnschedulableTaskSetAdded(
+  stageId: Int,
+  stageAttemptId: Int) extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerUnschedulableTaskSetRemoved(
+  stageId: Int,
+  stageAttemptId: Int) extends SparkListenerEvent
+
+@DeveloperApi
 case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
 
 /**
@@ -340,6 +350,20 @@ private[spark] trait SparkListenerInterface {
   def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit
 
   /**
+   * Called when a taskset becomes unschedulable due to blacklisting and dynamic allocation
+   * is enabled.
+   */
+  def onUnschedulableTaskSetAdded(
+      unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit
+
+  /**
+   * Called when an unschedulable taskset becomes schedulable and dynamic allocation
+   * is enabled.
+   */
+  def onUnschedulableTaskSetRemoved(
+      unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit
+
+  /**
    * Called when the driver receives a block update info.
    */
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
@@ -425,6 +449,12 @@ abstract class SparkListener extends SparkListenerInterface {
   override def onNodeUnblacklisted(
       nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
 
+  override def onUnschedulableTaskSetAdded(
+      unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = { }
+
+  override def onUnschedulableTaskSetRemoved(
+      unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = { }
+
   override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
 
   override def onSpeculativeTaskSubmitted(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 3d316c9..13e65f4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -79,6 +79,10 @@ private[spark] trait SparkListenerBus
         listener.onBlockUpdated(blockUpdated)
       case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
         listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
+      case unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded =>
+        listener.onUnschedulableTaskSetAdded(unschedulableTaskSetAdded)
+      case unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved =>
+        listener.onUnschedulableTaskSetRemoved(unschedulableTaskSetRemoved)
       case resourceProfileAdded: SparkListenerResourceProfileAdded =>
         listener.onResourceProfileAdded(resourceProfileAdded)
       case _ => listener.onOtherEvent(event)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 28e138e..510318a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -637,10 +637,9 @@ private[spark] class TaskSchedulerImpl(
         if (!launchedAnyTask) {
           taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>
               // If the taskSet is unschedulable we try to find an existing idle blacklisted
-              // executor. If we cannot find one, we abort immediately. Else we kill the idle
-              // executor and kick off an abortTimer which if it doesn't schedule a task within the
-              // the timeout will abort the taskSet if we were unable to schedule any task from the
-              // taskSet.
+              // executor and kill the idle executor and kick off an abortTimer which if it doesn't
+              // schedule a task within the the timeout will abort the taskSet if we were unable to
+              // schedule any task from the taskSet.
               // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per
               // task basis.
               // Note 2: The taskSet can still be aborted when there are more than one idle
@@ -648,22 +647,33 @@ private[spark] class TaskSchedulerImpl(
               // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on
               // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort
               // timer to expire and abort the taskSet.
+              //
+              // If there are no idle executors and dynamic allocation is enabled, then we would
+              // notify ExecutorAllocationManager to allocate more executors to schedule the
+              // unschedulable tasks else we will abort immediately.
               executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
                 case Some ((executorId, _)) =>
                   if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
                     blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))
-
-                    val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
-                    unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
-                    logInfo(s"Waiting for $timeout ms for completely "
-                      + s"blacklisted task to be schedulable again before aborting $taskSet.")
-                    abortTimer.schedule(
-                      createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
+                    updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
+                  }
+                case None =>
+                  //  Notify ExecutorAllocationManager about the unschedulable task set,
+                  // in order to provision more executors to make them schedulable
+                  if (Utils.isDynamicAllocationEnabled(conf)) {
+                    if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+                      logInfo(s"Notifying ExecutorAllocationManager to allocate more executors to" +
+                        s" schedule the unschedulable task before aborting $taskSet.")
+                      dagScheduler.unschedulableTaskSetAdded(taskSet.taskSet.stageId,
+                        taskSet.taskSet.stageAttemptId)
+                      updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
+                    }
+                  } else {
+                    // Abort Immediately
+                    logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
+                      s" executors can be found to kill. Aborting $taskSet.")
+                    taskSet.abortSinceCompletelyBlacklisted(taskIndex)
                   }
-                case None => // Abort Immediately
-                  logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
-                    s" executors can be found to kill. Aborting $taskSet." )
-                  taskSet.abortSinceCompletelyBlacklisted(taskIndex)
               }
           }
         } else {
@@ -676,6 +686,10 @@ private[spark] class TaskSchedulerImpl(
           if (unschedulableTaskSetToExpiryTime.nonEmpty) {
             logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +
               "recently scheduled.")
+            // Notify ExecutorAllocationManager as well as other subscribers that a task now
+            // recently becomes schedulable
+            dagScheduler.unschedulableTaskSetRemoved(taskSet.taskSet.stageId,
+              taskSet.taskSet.stageAttemptId)
             unschedulableTaskSetToExpiryTime.clear()
           }
         }
@@ -722,6 +736,17 @@ private[spark] class TaskSchedulerImpl(
     return tasks.map(_.toSeq)
   }
 
+  private def updateUnschedulableTaskSetTimeoutAndStartAbortTimer(
+      taskSet: TaskSetManager,
+      taskIndex: Int): Unit = {
+    val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+    unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
+    logInfo(s"Waiting for $timeout ms for completely " +
+      s"blacklisted task to be schedulable again before aborting $taskSet.")
+    abortTimer.schedule(
+      createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
+  }
+
   private def createUnschedulableTaskSetAbortTimer(
       taskSet: TaskSetManager,
       taskIndex: Int): TimerTask = {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 8037f4a..ea6e010 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -21,15 +21,15 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
 
-import org.mockito.ArgumentMatchers.{any, eq => meq}
-import org.mockito.Mockito.{mock, never, times, verify, when}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
 import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, ResourceProfileManager, TaskResourceRequests}
+import org.apache.spark.resource._
 import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -501,6 +501,175 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     assert(numExecutorsToAddForDefaultProfile(manager) === 1)
   }
 
+  test("SPARK-31418: one stage being unschedulable") {
+    val clock = new ManualClock()
+    val conf = createConf(0, 5, 0).set(config.EXECUTOR_CORES, 2)
+    val manager = createManager(conf, clock = clock)
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    post(SparkListenerStageSubmitted(createStageInfo(0, 2)))
+
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+
+    onExecutorAddedDefaultProfile(manager, "0")
+    val t1 = createTaskInfo(0, 0, executorId = s"0")
+    val t2 = createTaskInfo(1, 1, executorId = s"0")
+    post(SparkListenerTaskStart(0, 0, t1))
+    post(SparkListenerTaskStart(0, 0, t2))
+
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)
+
+    // Stage 0 becomes unschedulable due to blacklisting
+    post(SparkListenerUnschedulableTaskSetAdded(0, 0))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    // Assert that we are getting additional executor to schedule unschedulable tasks
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2)
+
+    // Add a new executor
+    onExecutorAddedDefaultProfile(manager, "1")
+    // Now once the task becomes schedulable, clear the unschedulableTaskSets
+    post(SparkListenerUnschedulableTaskSetRemoved(0, 0))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)
+  }
+
+  test("SPARK-31418: multiple stages being unschedulable") {
+    val clock = new ManualClock()
+    val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 2)
+    val manager = createManager(conf, clock = clock)
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    post(SparkListenerStageSubmitted(createStageInfo(0, 2)))
+    post(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+    post(SparkListenerStageSubmitted(createStageInfo(2, 2)))
+
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+
+    // Add necessary executors
+    (0 to 2).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
+
+    // Start all the tasks
+    (0 to 2).foreach {
+      i =>
+        val t1Info = createTaskInfo(0, (i * 2) + 1, executorId = s"${i / 2}")
+        val t2Info = createTaskInfo(1, (i * 2) + 2, executorId = s"${i / 2}")
+        post(SparkListenerTaskStart(i, 0, t1Info))
+        post(SparkListenerTaskStart(i, 0, t2Info))
+    }
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 3)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3)
+
+    // Complete the stage 0 tasks.
+    val t1Info = createTaskInfo(0, 0, executorId = s"0")
+    val t2Info = createTaskInfo(1, 1, executorId = s"0")
+    post(SparkListenerTaskEnd(0, 0, null, Success, t1Info, new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, Success, t2Info, new ExecutorMetrics, null))
+    post(SparkListenerStageCompleted(createStageInfo(0, 2)))
+
+    // Stage 1 and 2 becomes unschedulable now due to blacklisting
+    post(SparkListenerUnschedulableTaskSetAdded(1, 0))
+    post(SparkListenerUnschedulableTaskSetAdded(2, 0))
+
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    // Assert that we are getting additional executor to schedule unschedulable tasks
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 4)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 4)
+
+    // Add a new executor
+    onExecutorAddedDefaultProfile(manager, "3")
+
+    // Now once the task becomes schedulable, clear the unschedulableTaskSets
+    post(SparkListenerUnschedulableTaskSetRemoved(1, 0))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 4)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5)
+  }
+
+  test("SPARK-31418: remove executors after unschedulable tasks end") {
+    val clock = new ManualClock()
+    val stage = createStageInfo(0, 10)
+    val conf = createConf(0, 6, 0).set(config.EXECUTOR_CORES, 2)
+    val manager = createManager(conf, clock = clock)
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    post(SparkListenerStageSubmitted(stage))
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+
+    (0 to 4).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
+    (0 to 9).map { i => createTaskInfo(i, i, executorId = s"${i / 2}") }.foreach {
+      info => post(SparkListenerTaskStart(0, 0, info))
+    }
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 5)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5)
+
+    // 8 tasks (0 - 7) finished
+    (0 to 7).map { i => createTaskInfo(i, i, executorId = s"${i / 2}") }.foreach {
+      info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))
+    }
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)
+    (0 to 3).foreach { i => assert(removeExecutorDefaultProfile(manager, i.toString)) }
+    (0 to 3).foreach { i => onExecutorRemoved(manager, i.toString) }
+
+    // Now due to blacklisting, the task becomes unschedulable
+    post(SparkListenerUnschedulableTaskSetAdded(0, 0))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2)
+
+    // New executor got added
+    onExecutorAddedDefaultProfile(manager, "5")
+
+    // Now once the task becomes schedulable, clear the unschedulableTaskSets
+    post(SparkListenerUnschedulableTaskSetRemoved(0, 0))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(9, 9, "4"), new ExecutorMetrics, null))
+    // Unschedulable task successfully ran on the new executor provisioned
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(8, 8, "5"), new ExecutorMetrics, null))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    post(SparkListenerStageCompleted(stage))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 0)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 0)
+    assert(removeExecutorDefaultProfile(manager, "4"))
+    onExecutorRemoved(manager, "4")
+    assert(removeExecutorDefaultProfile(manager, "5"))
+    onExecutorRemoved(manager, "5")
+  }
+
   test("SPARK-30511 remove executors when speculative tasks end") {
     val clock = new ManualClock()
     val stage = createStageInfo(0, 40)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index e43be60..9ca3ce9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -1000,6 +1000,42 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(!tsm.isZombie)
   }
 
+  test("SPARK-31418 abort timer should kick in when task is completely blacklisted &" +
+    "allocation manager could not acquire a new executor before the timeout") {
+    // set the abort timer to fail immediately
+    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0",
+      config.DYN_ALLOCATION_ENABLED.key -> "true")
+
+    // We have 2 tasks remaining with 1 executor
+    val taskSet = FakeTask.createTaskSet(numTasks = 2)
+    taskScheduler.submitTasks(taskSet)
+    val tsm = stageToMockTaskSetManager(0)
+
+    // submit an offer with one executor
+    taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor0", "host0", 2))).flatten
+
+    // Fail the running task
+    failTask(0, TaskState.FAILED, UnknownReason, tsm)
+    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+      "executor0", 0)).thenReturn(true)
+
+    // If the executor is busy, then dynamic allocation should kick in and try
+    // to acquire additional executors to schedule the blacklisted task
+    assert(taskScheduler.isExecutorBusy("executor0"))
+
+    // make an offer on the blacklisted executor.  We won't schedule anything, and set the abort
+    // timer to kick in immediately
+    assert(taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 1)
+    )).flatten.size === 0)
+    // Wait for the abort timer to kick in. Even though we configure the timeout to be 0, there is a
+    // slight delay as the abort timer is launched in a separate thread.
+    eventually(timeout(500.milliseconds)) {
+      assert(tsm.isZombie)
+    }
+  }
+
   /**
    * Helper for performance tests.  Takes the explicitly blacklisted nodes and executors; verifies
    * that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org