You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/02/03 03:45:39 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #27223: [SPARK-30511][SPARK-28403][CORE] Don't treat failed/killed speculative tasks as pending in ExecutorAllocationManager

Ngone51 commented on a change in pull request #27223: [SPARK-30511][SPARK-28403][CORE] Don't treat failed/killed speculative tasks as pending in ExecutorAllocationManager
URL: https://github.com/apache/spark/pull/27223#discussion_r373912055
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 ##########
 @@ -614,18 +615,30 @@ private[spark] class ExecutorAllocationManager(
             stageAttemptToNumRunningTask -= stageAttempt
           }
         }
-        // If the task failed, we expect it to be resubmitted later. To ensure we have
-        // enough resources to run the resubmitted task, we need to mark the scheduler
-        // as backlogged again if it's not already marked as such (SPARK-8366)
-        if (taskEnd.reason != Success) {
-          if (totalPendingTasks() == 0) {
-            allocationManager.onSchedulerBacklogged()
-          }
-          if (taskEnd.taskInfo.speculative) {
-            stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
-          } else {
-            stageAttemptToTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
-          }
+
+        if (taskEnd.taskInfo.speculative) {
+          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
+          stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
+        }
+
+        taskEnd.reason match {
+          case Success | _: TaskKilled =>
+          case _ =>
+            if (totalPendingTasks() == 0) {
+              // If the task failed (not intentionally killed), we expect it to be resubmitted
+              // later. To ensure we have enough resources to run the resubmitted task, we need to
+              // mark the scheduler as backlogged again if it's not already marked as such
+              // (SPARK-8366)
+              allocationManager.onSchedulerBacklogged()
 
 Review comment:
   Previously, we always call `onSchedulerBacklogged` when `totalPendingTasks() == 0` for both speculative and non-speculative tasks. But according to this PR, we should not do this for speculative tasks now. Right? @linzebing 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org