You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/01/31 14:49:58 UTC

[spark] branch master updated: [SPARK-30511][SPARK-28403][CORE] Don't treat failed/killed speculative tasks as pending in ExecutorAllocationManager

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 21bc047  [SPARK-30511][SPARK-28403][CORE] Don't treat failed/killed speculative tasks as pending in ExecutorAllocationManager
21bc047 is described below

commit 21bc0474bbb16c7648aed40f25a2945d98d2a167
Author: zebingl@fb.com <ze...@fb.com>
AuthorDate: Fri Jan 31 08:49:34 2020 -0600

    [SPARK-30511][SPARK-28403][CORE] Don't treat failed/killed speculative tasks as pending in ExecutorAllocationManager
    
    ### What changes were proposed in this pull request?
    
    Currently, when speculative tasks fail/get killed, they are still considered as pending and count towards the calculation of number of needed executors. To be more accurate: `stageAttemptToNumSpeculativeTasks(stageAttempt)` is incremented on onSpeculativeTaskSubmitted, but never decremented.  `stageAttemptToNumSpeculativeTasks -= stageAttempt` is performed on stage completion. **This means Spark is marking ended speculative tasks as pending, which leads to Spark to hold more executors [...]
    
    This PR fixes this issue by updating `stageAttemptToSpeculativeTaskIndices` and  `stageAttemptToNumSpeculativeTasks` on speculative tasks completion.  This PR also addresses some other minor issues: scheduler behavior after receiving an intentionally killed task event; try to address [SPARK-28403](https://issues.apache.org/jira/browse/SPARK-28403).
    
    ### Why are the changes needed?
    
    This has caused resource wastage in our production with speculation enabled. With aggressive speculation, we found data skewed jobs can hold hundreds of idle executors with less than 10 tasks running.
    
    An easy repro of the issue (`--conf spark.speculation=true --conf spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=1000` in cluster mode):
    ```
    val n = 4000
    val someRDD = sc.parallelize(1 to n, n)
    someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
    if (index < 300 && index >= 150) {
        Thread.sleep(index * 1000) // Fake running tasks
    } else if (index == 300) {
        Thread.sleep(1000 * 1000) // Fake long running tasks
    }
    it.toList.map(x => index + ", " + x).iterator
    }).collect
    ```
    You will see when running the last task, we would be hold 38 executors (see below), which is exactly (152 + 3) / 4 = 38.
    ![image](https://user-images.githubusercontent.com/9404831/72469112-9a7fac00-3793-11ea-8f50-74d0ab7325a4.png)
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added a comprehensive unit test.
    
    Test with the above repro shows that we are holding 2 executors at the end
    ![image](https://user-images.githubusercontent.com/9404831/72469177-bbe09800-3793-11ea-850f-4a2c67142899.png)
    
    Closes #27223 from linzebing/speculation_fix.
    
    Authored-by: zebingl@fb.com <ze...@fb.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../apache/spark/ExecutorAllocationManager.scala   |  61 ++++++----
 .../spark/ExecutorAllocationManagerSuite.scala     | 135 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bff854a..677386c 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -263,9 +263,16 @@ private[spark] class ExecutorAllocationManager(
    */
   private def maxNumExecutorsNeeded(): Int = {
     val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
-    math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
-              tasksPerExecutorForFullParallelism)
-      .toInt
+    val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
+      tasksPerExecutorForFullParallelism).toInt
+    if (tasksPerExecutorForFullParallelism > 1 && maxNeeded == 1 &&
+      listener.pendingSpeculativeTasks > 0) {
+      // If we have pending speculative tasks and only need a single executor, allocate one more
+      // to satisfy the locality requirements of speculation
+      maxNeeded + 1
+    } else {
+      maxNeeded
+    }
   }
 
   private def totalRunningTasks(): Int = synchronized {
@@ -377,14 +384,8 @@ private[spark] class ExecutorAllocationManager(
     // If our target has not changed, do not send a message
     // to the cluster manager and reset our exponential growth
     if (delta == 0) {
-      // Check if there is any speculative jobs pending
-      if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) {
-        numExecutorsTarget =
-          math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors)
-      } else {
-        numExecutorsToAdd = 1
-        return 0
-      }
+      numExecutorsToAdd = 1
+      return 0
     }
 
     val addRequestAcknowledged = try {
@@ -512,7 +513,7 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks to be scheduled in each stageAttempt
+    // Number of speculative tasks pending/running in each stageAttempt
     private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
     // The speculative tasks started in each stageAttempt
     private val stageAttemptToSpeculativeTaskIndices =
@@ -614,18 +615,30 @@ private[spark] class ExecutorAllocationManager(
             stageAttemptToNumRunningTask -= stageAttempt
           }
         }
-        // If the task failed, we expect it to be resubmitted later. To ensure we have
-        // enough resources to run the resubmitted task, we need to mark the scheduler
-        // as backlogged again if it's not already marked as such (SPARK-8366)
-        if (taskEnd.reason != Success) {
-          if (totalPendingTasks() == 0) {
-            allocationManager.onSchedulerBacklogged()
-          }
-          if (taskEnd.taskInfo.speculative) {
-            stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
-          } else {
-            stageAttemptToTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
-          }
+
+        if (taskEnd.taskInfo.speculative) {
+          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
+          stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
+        }
+
+        taskEnd.reason match {
+          case Success | _: TaskKilled =>
+          case _ =>
+            if (totalPendingTasks() == 0) {
+              // If the task failed (not intentionally killed), we expect it to be resubmitted
+              // later. To ensure we have enough resources to run the resubmitted task, we need to
+              // mark the scheduler as backlogged again if it's not already marked as such
+              // (SPARK-8366)
+              allocationManager.onSchedulerBacklogged()
+            }
+            if (!taskEnd.taskInfo.speculative) {
+              // If a non-speculative task is intentionally killed, it means the speculative task
+              // has succeeded, and no further task of this task index will be resubmitted. In this
+              // case, the task index is completed and we shouldn't remove it from
+              // stageAttemptToTaskIndices. Otherwise, we will have a pending non-speculative task
+              // for the task index (SPARK-30511)
+              stageAttemptToTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
+            }
         }
       }
     }
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 99f3e3b..8d95849 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -264,6 +264,141 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("SPARK-30511 remove executors when speculative tasks end") {
+    val clock = new ManualClock()
+    val stage = createStageInfo(0, 40)
+    val manager = createManager(createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4), clock = clock)
+
+    post(SparkListenerStageSubmitted(stage))
+    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 2)
+    assert(addExecutors(manager) === 4)
+    assert(addExecutors(manager) === 3)
+
+    (0 to 9).foreach(execId => onExecutorAdded(manager, execId.toString))
+    (0 to 39).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
+      info => post(SparkListenerTaskStart(0, 0, info))
+    }
+    assert(numExecutorsTarget(manager) === 10)
+    assert(maxNumExecutorsNeeded(manager) == 10)
+
+    // 30 tasks (0 - 29) finished
+    (0 to 29).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
+      info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) }
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 3)
+    assert(maxNumExecutorsNeeded(manager) == 3)
+    (0 to 6).foreach { i => assert(removeExecutor(manager, i.toString))}
+    (0 to 6).foreach { i => onExecutorRemoved(manager, i.toString)}
+
+    // 10 speculative tasks (30 - 39) launch for the remaining tasks
+    (30 to 39).foreach { _ => post(SparkListenerSpeculativeTaskSubmitted(0))}
+    assert(addExecutors(manager) === 1)
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsTarget(manager) == 5)
+    assert(maxNumExecutorsNeeded(manager) == 5)
+    (10 to 12).foreach(execId => onExecutorAdded(manager, execId.toString))
+    (40 to 49).map { i =>
+      createTaskInfo(taskId = i, taskIndex = i - 10, executorId = s"${i / 4}", speculative = true)}
+      .foreach { info => post(SparkListenerTaskStart(0, 0, info))}
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) == 5) // At this point, we still have 6 executors running
+    assert(maxNumExecutorsNeeded(manager) == 5)
+
+    // 6 speculative tasks (40 - 45) finish before the original tasks, with 4 speculative remaining
+    (40 to 45).map { i =>
+      createTaskInfo(taskId = i, taskIndex = i - 10, executorId = s"${i / 4}", speculative = true)}
+      .foreach {
+        info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))}
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 4)
+    assert(maxNumExecutorsNeeded(manager) == 4)
+    assert(removeExecutor(manager, "10"))
+    onExecutorRemoved(manager, "10")
+    // At this point, we still have 5 executors running: ["7", "8", "9", "11", "12"]
+
+    // 6 original tasks (30 - 35) are intentionally killed
+    (30 to 35).map { i =>
+      createTaskInfo(i, i, executorId = s"${i / 4}")}
+      .foreach { info => post(
+        SparkListenerTaskEnd(0, 0, null, TaskKilled("test"), info, new ExecutorMetrics, null))}
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 2)
+    assert(maxNumExecutorsNeeded(manager) == 2)
+    (7 to 8).foreach { i => assert(removeExecutor(manager, i.toString))}
+    (7 to 8).foreach { i => onExecutorRemoved(manager, i.toString)}
+    // At this point, we still have 3 executors running: ["9", "11", "12"]
+
+    // Task 36 finishes before the speculative task 46, task 46 killed
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(36, 36, executorId = "9"), new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, TaskKilled("test"),
+      createTaskInfo(46, 36, executorId = "11", speculative = true), new ExecutorMetrics, null))
+
+    // We should have 3 original tasks (index 37, 38, 39) running, with corresponding 3 speculative
+    // tasks running. Target lowers to 2, but still hold 3 executors ["9", "11", "12"]
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 2)
+    assert(maxNumExecutorsNeeded(manager) == 2)
+    // At this point, we still have 3 executors running: ["9", "11", "12"]
+
+    // Task 37 and 47 succeed at the same time
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(37, 37, executorId = "9"), new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(47, 37, executorId = "11", speculative = true), new ExecutorMetrics, null))
+
+    // We should have 2 original tasks (index 38, 39) running, with corresponding 2 speculative
+    // tasks running
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 1)
+    assert(maxNumExecutorsNeeded(manager) == 1)
+    assert(removeExecutor(manager, "11"))
+    onExecutorRemoved(manager, "11")
+    // At this point, we still have 2 executors running: ["9", "12"]
+
+    // Task 38 fails and task 49 fails, new speculative task 50 is submitted to speculate on task 39
+    post(SparkListenerTaskEnd(0, 0, null, UnknownReason,
+      createTaskInfo(38, 38, executorId = "9"), new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, UnknownReason,
+      createTaskInfo(49, 39, executorId = "12", speculative = true), new ExecutorMetrics, null))
+    post(SparkListenerSpeculativeTaskSubmitted(0))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    // maxNeeded = 1, allocate one more to satisfy speculation locality requirement
+    assert(numExecutorsTarget(manager) === 2)
+    assert(maxNumExecutorsNeeded(manager) == 2)
+    post(SparkListenerTaskStart(0, 0,
+      createTaskInfo(50, 39, executorId = "12", speculative = true)))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 1)
+    assert(maxNumExecutorsNeeded(manager) == 1)
+
+    // Task 39 and 48 succeed, task 50 killed
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(39, 39, executorId = "9"), new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, Success,
+      createTaskInfo(48, 38, executorId = "12", speculative = true), new ExecutorMetrics, null))
+    post(SparkListenerTaskEnd(0, 0, null, TaskKilled("test"),
+      createTaskInfo(50, 39, executorId = "12", speculative = true), new ExecutorMetrics, null))
+    post(SparkListenerStageCompleted(stage))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager) === 0)
+    assert(maxNumExecutorsNeeded(manager) == 0)
+    assert(removeExecutor(manager, "9"))
+    onExecutorRemoved(manager, "9")
+    assert(removeExecutor(manager, "12"))
+    onExecutorRemoved(manager, "12")
+  }
+
   test("properly handle task end events from completed stages") {
     val manager = createManager(createConf(0, 10, 0))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org