You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "Ngone51 (via GitHub)" <gi...@apache.org> on 2023/11/22 08:39:37 UTC

[PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Ngone51 opened a new pull request, #43954:
URL: https://github.com/apache/spark/pull/43954

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This PR removes the interface `TaskScheduler.killAllTaskAttempts` and its implementations. And replace it with `TaskScheduler.cancelTasks`.
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Spark has two functions to kill all tasks in a Stage:
   * `cancelTasks`: Not only kill all the running tasks in all the stage attempts but also abort all the stage attempts
   *  `killAllTaskAttempts`: Only kill all the running tasks in all the stage attemtps but won't abort the attempts.
   
   
   However, there's no use case in Spark that a stage would launch new tasks after its all tasks get killed. So I think we can replace `killAllTaskAttempts` with `cancelTasks` directly.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No. `TaskScheduler` is internal.
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   Pass existing tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1847130056

   To fix the tests, I have to move (https://github.com/apache/spark/pull/43954/commits/fe70ba95a67800b798443b8fa873d2b24efa2067) the "abort stage" call back into `cancelTasks()` with the control flag rather than call "abort stage" after `cancelTasks()`. After the move, the timing of "abort stage" call strictly follows the original behavior.
   
   The probelm of calling "abort stage" after `cancelTasks()` is, e.g., test `DAGSchedulerSuite` has its own implementation of `TaskScheduler` which overrides the `cancelTasks()` with no "abort stage". But our change has extracted "abort stage" out side of `cancelTasks()`.  Thus, the intention to not call "abort stage" in `cancelTask()` is broken and so test fails.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1432410827


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -54,12 +54,30 @@ import org.apache.spark.util.ArrayImplicits._
 class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
   extends DAGSchedulerEventProcessLoop(dagScheduler) {
 
+  dagScheduler.setEventProcessLoop(this)

Review Comment:
   The example is the test `DAGSchedulerSuite - trivial job failure`. In the test, it explictly post `TaskSetFailed` by `DAGSchedulerEventProcessLoopTester`. While processing `TaskSetFailed`, it generates event `StageFailed` (added by this PR) that posted by `DAGSchedulerEventProcessLoop`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1402526009


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -302,10 +302,19 @@ private[spark] class TaskSchedulerImpl(
       reason: String): Unit = synchronized {
     logInfo("Cancelling stage " + stageId)
     // Kill all running tasks for the stage.
-    killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled: " + reason)
-    // Cancel all attempts for the stage.
+    logInfo(s"Killing all running tasks in stage $stageId: $reason")
     taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
       attempts.foreach { case (_, tsm) =>
+        // There are two possible cases here:
+        // 1. The task set manager has been created and some tasks have been scheduled.
+        //    In this case, send a kill signal to the executors to kill the task.
+        // 2. The task set manager has been created but no tasks have been scheduled. In this case,
+        //    simply continue.
+        tsm.runningTasksSet.foreach { tid =>
+          taskIdToExecutorId.get(tid).foreach { execId =>
+            backend.killTask(tid, execId, interruptThread, reason)

Review Comment:
   In order to preserve the old behaviors for the task kill reason, should this be
   
   ```
   reason = "Stage cancelled: " + reason
   ```
   
   instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408621909


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -66,10 +66,6 @@ private[spark] trait TaskScheduler {
    */
   def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean
 
-  // Kill all the running task attempts in a stage.
-  // Throw UnsupportedOperationException if the backend doesn't support kill tasks.
-  def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit

Review Comment:
   Right, the whole `TaskScheduler` is private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408994168


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -296,18 +296,31 @@ private[spark] class TaskSchedulerImpl(
     new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
   }
 
+  // Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't
+  // be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up
+  // once all the tasks has been finished. The stage attempt could be aborted after the call of
+  // `cancelTasks` if required.
   override def cancelTasks(
       stageId: Int,
       interruptThread: Boolean,
       reason: String): Unit = synchronized {
     logInfo("Cancelling stage " + stageId)
     // Kill all running tasks for the stage.
-    killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled: " + reason)
-    // Cancel all attempts for the stage.
+    logInfo(s"Killing all running tasks in stage $stageId: $reason")
     taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
       attempts.foreach { case (_, tsm) =>
-        tsm.abort("Stage %s cancelled".format(stageId))
-        logInfo("Stage %d was cancelled".format(stageId))
+        // There are two possible cases here:
+        // 1. The task set manager has been created and some tasks have been scheduled.
+        //    In this case, send a kill signal to the executors to kill the task.
+        // 2. The task set manager has been created but no tasks have been scheduled. In this case,
+        //    simply continue.
+        tsm.runningTasksSet.foreach { tid =>
+          taskIdToExecutorId.get(tid).foreach { execId =>
+            backend.killTask(tid, execId, interruptThread, s"Stage cancelled: $reason")
+          }
+        }
+        tsm.suspend()

Review Comment:
   No, I don't. Killing tasks may take some time so I don't expect an immediate tsm finishes. `suspend()` intends to call `maybeFinishTaskSet()` for safe in case the tsm can't finish normally in the end. Although, I don't see it's an issue in prod (tms should finishes normally when all tasks finish) but I did see it fails a test (`cancelTasks shall kill all the running tasks`) without `maybeFinishTaskSet()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1888711672

   @mridulm @cloud-fan Could you help merge the PR if you don't have other comments? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1845378574

   > Btw, we should flip this switch on and off in the relevant tests to check if the behavior is preserved.
   
   @mridulm I tried to enable `spark.scheduler.stage.legacyAbortAfterCancelTasks` in `DAGSchedulerSuite`. It does appear to have several test fauilures 😢 . I'm working on investigating the failures right now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1432382298


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -840,6 +856,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(failure.getMessage === "Job aborted due to stage failure: some failure")
     assert(sparkListener.failedStages === Seq(0))
     assertDataStructuresEmpty()
+    Thread.sleep(3000)

Review Comment:
   Why do we need this `sleep` ?



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -54,12 +54,30 @@ import org.apache.spark.util.ArrayImplicits._
 class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
   extends DAGSchedulerEventProcessLoop(dagScheduler) {
 
+  dagScheduler.setEventProcessLoop(this)

Review Comment:
   To make sure I am not missing anything, this is to handle case of a `DAGSchedulerEvent` processing resulting in another `DAGSchedulerEvent` getting generated (as part of the logic for the event) - and we want the first to complete before the second is handled (which is how 'normal' flow would be).
   
   Is this right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1407378513


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -54,7 +54,7 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage.
+  // Kill all the tasks in all the stage attempts of the same stage Id

Review Comment:
   Please add comment about `mark all the stage attempts as zombie`.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -66,10 +66,6 @@ private[spark] trait TaskScheduler {
    */
   def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean
 
-  // Kill all the running task attempts in a stage.
-  // Throw UnsupportedOperationException if the backend doesn't support kill tasks.
-  def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit

Review Comment:
   Does this is a break change?



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -1671,37 +1671,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     assert(taskScheduler.taskSetManagerForAttempt(0, 0).isEmpty)
   }
 
-  test("killAllTaskAttempts shall kill all the running tasks and not fail the stage") {
-    val taskScheduler = setupScheduler()
-
-    taskScheduler.initialize(new FakeSchedulerBackend {
-      override def killTask(
-          taskId: Long,
-          executorId: String,
-          interruptThread: Boolean,
-          reason: String): Unit = {
-        // Since we only submit one stage attempt, the following call is sufficient to mark the
-        // task as killed.
-        taskScheduler.taskSetManagerForAttempt(0, 0).get.runningTasksSet.remove(taskId)
-      }
-    })
-
-    val attempt1 = FakeTask.createTaskSet(10)
-    taskScheduler.submitTasks(attempt1)
-
-    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
-      new WorkerOffer("executor1", "host1", 1))
-    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
-    assert(2 === taskDescriptions.length)
-    val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
-    assert(2 === tsm.runningTasks)
-
-    taskScheduler.killAllTaskAttempts(0, false, "test")

Review Comment:
   Shall we update the test case with `taskScheduler.cancelTasks`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1407349896


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2179,12 +2164,12 @@ private[spark] class DAGScheduler(
           val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
             failure.toErrorString
           try {
-            // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
+            // cancelTasks will fail if a SchedulerBackend does not implement killTask.
             val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
               "failed."
             val job = jobIdToActiveJob.get(failedStage.firstJobId)
             val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
-            taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)

Review Comment:
   Updated. I removed the "abort stage" inside `cancelTasks` but move it to after the callers of `cancelTasks`.  I also added a new conf `spark.scheduler.stage.abortStageAfterCancelTasks` to control wether we should abort the stage after `cancelTasks`. By default, we don't abort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409183926


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2603,4 +2603,13 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS =
+    ConfigBuilder("spark.scheduler.stage.legacyAbortStageAfterCancelTasks")

Review Comment:
   should be `spark.legacy....`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1422054329


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2603,4 +2603,14 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS =
+    ConfigBuilder("spark.scheduler.stage.legacyAbortAfterCancelTasks")
+      .doc("Whether to abort a stage after TaskScheduler.cancelTasks(). This is used to restore " +
+        "the original behavior in case there are any regressions after abort stage is removed " +
+        "from TaskScheduler.cancelTasks()")
+      .version("4.0.0")
+      .internal()
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   Given this is a new change, might be good to be conservative about it.
   We can change default in 4.1 or later, it does not need to be at major version.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1402889739


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2179,12 +2164,12 @@ private[spark] class DAGScheduler(
           val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
             failure.toErrorString
           try {
-            // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
+            // cancelTasks will fail if a SchedulerBackend does not implement killTask.
             val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
               "failed."
             val job = jobIdToActiveJob.get(failedStage.firstJobId)
             val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
-            taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)

Review Comment:
   I agree. The worst case is we manually trigger "abort stage" in the existing callers of `cancelTasks` to keep the old behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408622291


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2603,4 +2603,13 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val ABORT_STAGE_AFTER_CANCEL_TASKS =
+    ConfigBuilder("spark.scheduler.stage.abortStageAfterCancelTasks")

Review Comment:
   Right, should probably add the "legacy" key word.



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -1671,37 +1671,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     assert(taskScheduler.taskSetManagerForAttempt(0, 0).isEmpty)
   }
 
-  test("killAllTaskAttempts shall kill all the running tasks and not fail the stage") {
-    val taskScheduler = setupScheduler()
-
-    taskScheduler.initialize(new FakeSchedulerBackend {
-      override def killTask(
-          taskId: Long,
-          executorId: String,
-          interruptThread: Boolean,
-          reason: String): Unit = {
-        // Since we only submit one stage attempt, the following call is sufficient to mark the
-        // task as killed.
-        taskScheduler.taskSetManagerForAttempt(0, 0).get.runningTasksSet.remove(taskId)
-      }
-    })
-
-    val attempt1 = FakeTask.createTaskSet(10)
-    taskScheduler.submitTasks(attempt1)
-
-    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
-      new WorkerOffer("executor1", "host1", 1))
-    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
-    assert(2 === taskDescriptions.length)
-    val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
-    assert(2 === tsm.runningTasks)
-
-    taskScheduler.killAllTaskAttempts(0, false, "test")

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408984350


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1894,24 +1894,8 @@ private[spark] class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    markStageAsFinished(resultStage)

Review Comment:
   `cancelRunningIndependentStages` already does that, see:
   
   https://github.com/apache/spark/blob/14d854beeabcf872175edfd65cb6475ecb7d0ae7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2862-L2863



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1410068855


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1871,21 +1871,6 @@ private[spark] class DAGScheduler(
                     markStageAsFinished(resultStage)
                     cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
-                    try {
-                      // killAllTaskAttempts will fail if a SchedulerBackend does not implement
-                      // killTask.
-                      logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
-                        "or zombie tasks for this job")
-                      // ResultStage is only used by this job. It's safe to kill speculative or
-                      // zombie tasks in this stage.
-                      taskScheduler.killAllTaskAttempts(

Review Comment:
   @mridulm Thanks. I think you're right. I'll revert the changes around here.
   
   (I'd like to see if we have other ways to organize these functions better. It's a bit messy now.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1445731163


##########
core/src/test/scala/org/apache/spark/TempLocalSparkContext.scala:
##########
@@ -51,7 +51,7 @@ trait TempLocalSparkContext extends BeforeAndAfterEach
    */
   def sc: SparkContext = {
     if (_sc == null) {
-      _sc = new SparkContext(_conf)
+      _sc = new SparkContext(conf)

Review Comment:
   isn't it the same?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1863983002

   Hi @mridulm @cloud-fan I have moved "abort stage" outside of `cancelTasks` again and fixed tests in another way (I previously fixed tests by moving "abort stage" into  `cancelTasks` 🥲). The flag on "abort stage" turns on by default. Tests look good now. Could you take another look? Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1864096635

   @mridulm Add `DAGSchedulerAbortStageOffSuite` to test `legacyAbortAfterCancelTasks = false`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1448202635


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1002,6 +1002,17 @@ private[spark] class TaskSetManager(
     maybeFinishTaskSet()
   }
 
+  // Suspends this TSM to avoid launching new tasks.
+  //
+  // Unlike `abort()`, this function intentionally to not notify DAGScheduler to avoid
+  // redundant operations. So the invocation to this function should assume DAGScheduler
+  // already knows about this TSM failure. For example, this function can be called from
+  // `TaskScheduler.cancelTasks` by DAGScheduler.

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408582440


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -66,10 +66,6 @@ private[spark] trait TaskScheduler {
    */
   def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean
 
-  // Kill all the running task attempts in a stage.
-  // Throw UnsupportedOperationException if the backend doesn't support kill tasks.
-  def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit

Review Comment:
   this is private API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408623095


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2205,12 +2190,12 @@ private[spark] class DAGScheduler(
           val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
             failure.toErrorString
           try {
-            // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
+            // cancelTasks will fail if a SchedulerBackend does not implement killTask.
             val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
               "failed."
             val job = jobIdToActiveJob.get(failedStage.firstJobId)
             val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
-            taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)
+            taskScheduler.cancelTasks(stageId, shouldInterrupt, reason)

Review Comment:
   It is changed from `killAllTaskAttempts`, which doesn't abort the stage. So we don't abort here either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408625247


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -66,10 +66,6 @@ private[spark] trait TaskScheduler {
    */
   def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean
 
-  // Kill all the running task attempts in a stage.
-  // Throw UnsupportedOperationException if the backend doesn't support kill tasks.
-  def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1851530021

   Damn! Barrier stage seems to be a special case. It called `killAllTaskAttempts()` to kill all the other tasks when there was a task failure but didn't abort the stage as it would have a retry later. In this PR, we replace `killAllTaskAttempts()` with `cancelTasks()` and enables stage abortion by default within `cancelTasks()`. This leads to the barrier stage failure instead of retry. It would only work for all the cases if we're confident enough to remove stage abortion from `cancelTasks()` thoroughly without any control flag.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1433910301


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -54,12 +54,30 @@ import org.apache.spark.util.ArrayImplicits._
 class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
   extends DAGSchedulerEventProcessLoop(dagScheduler) {
 
+  dagScheduler.setEventProcessLoop(this)

Review Comment:
   Sounds good, thanks for clarifying !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1448202496


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -296,18 +296,31 @@ private[spark] class TaskSchedulerImpl(
     new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
   }
 
-  override def cancelTasks(
+  // Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't
+  // be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up
+  // once all the tasks has been finished. The stage attempt could be aborted after the call of
+  // `cancelTasks` if required.

Review Comment:
   ```suggestion
     // `killAllTaskAttempts ` if required.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1402598488


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1871,21 +1871,6 @@ private[spark] class DAGScheduler(
                     markStageAsFinished(resultStage)
                     cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
-                    try {
-                      // killAllTaskAttempts will fail if a SchedulerBackend does not implement
-                      // killTask.
-                      logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
-                        "or zombie tasks for this job")
-                      // ResultStage is only used by this job. It's safe to kill speculative or
-                      // zombie tasks in this stage.
-                      taskScheduler.killAllTaskAttempts(

Review Comment:
   I agree:
   
   It looks like `taskScheduler.killAllTaskAttempts` was added in https://github.com/apache/spark/pull/22771 and at that time `cancelRunningIndependentStages` did not exist. The `cancelRunningIndependentStages` method was added later in https://github.com/apache/spark/pull/27050 and is a generalization to cancel both the extra tasks from the finished stage and plus any other independent zombie stage attempts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409488053


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1871,21 +1871,6 @@ private[spark] class DAGScheduler(
                     markStageAsFinished(resultStage)
                     cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
-                    try {
-                      // killAllTaskAttempts will fail if a SchedulerBackend does not implement
-                      // killTask.
-                      logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
-                        "or zombie tasks for this job")
-                      // ResultStage is only used by this job. It's safe to kill speculative or
-                      // zombie tasks in this stage.
-                      taskScheduler.killAllTaskAttempts(

Review Comment:
   See my [comment above](https://github.com/apache/spark/pull/43954/files#r1409479495), unless we make other changes appropriately, we will have to revert this - as the tasks wont be killed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408972767


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -296,18 +296,31 @@ private[spark] class TaskSchedulerImpl(
     new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
   }
 
+  // Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't
+  // be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up
+  // once all the tasks has been finished. The stage attempt could be aborted after the call of
+  // `cancelTasks` if required.
   override def cancelTasks(
       stageId: Int,
       interruptThread: Boolean,
       reason: String): Unit = synchronized {
     logInfo("Cancelling stage " + stageId)
     // Kill all running tasks for the stage.
-    killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled: " + reason)
-    // Cancel all attempts for the stage.
+    logInfo(s"Killing all running tasks in stage $stageId: $reason")
     taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
       attempts.foreach { case (_, tsm) =>
-        tsm.abort("Stage %s cancelled".format(stageId))
-        logInfo("Stage %d was cancelled".format(stageId))
+        // There are two possible cases here:
+        // 1. The task set manager has been created and some tasks have been scheduled.
+        //    In this case, send a kill signal to the executors to kill the task.
+        // 2. The task set manager has been created but no tasks have been scheduled. In this case,
+        //    simply continue.
+        tsm.runningTasksSet.foreach { tid =>
+          taskIdToExecutorId.get(tid).foreach { execId =>
+            backend.killTask(tid, execId, interruptThread, s"Stage cancelled: $reason")
+          }
+        }
+        tsm.suspend()

Review Comment:
   I guess you expect that the `tsm` should be finished. But it may not necessarily happen.



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1894,24 +1894,8 @@ private[spark] class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    markStageAsFinished(resultStage)

Review Comment:
   Why `markStageAsFinished` no need?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408670937


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -1671,37 +1671,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     assert(taskScheduler.taskSetManagerForAttempt(0, 0).isEmpty)
   }
 
-  test("killAllTaskAttempts shall kill all the running tasks and not fail the stage") {
-    val taskScheduler = setupScheduler()
-
-    taskScheduler.initialize(new FakeSchedulerBackend {
-      override def killTask(
-          taskId: Long,
-          executorId: String,
-          interruptThread: Boolean,
-          reason: String): Unit = {
-        // Since we only submit one stage attempt, the following call is sufficient to mark the
-        // task as killed.
-        taskScheduler.taskSetManagerForAttempt(0, 0).get.runningTasksSet.remove(taskId)
-      }
-    })
-
-    val attempt1 = FakeTask.createTaskSet(10)
-    taskScheduler.submitTasks(attempt1)
-
-    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
-      new WorkerOffer("executor1", "host1", 1))
-    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
-    assert(2 === taskDescriptions.length)
-    val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
-    assert(2 === tsm.runningTasks)
-
-    taskScheduler.killAllTaskAttempts(0, false, "test")

Review Comment:
   I just realize that we have a test "cancelTasks shall kill all the running tasks and fail the stage" right above. So I think keeping that test should be enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409479495


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1894,24 +1894,8 @@ private[spark] class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    markStageAsFinished(resultStage)

Review Comment:
   They are not the same @Ngone51 - `markStageAsFinished` is for successful stage completions (when no `reason`), while `cancelRunningIndependentStages` is aborting other stages for the job which now need to be killed due to the job successfully terminating.
   
   One impact of this is in the `errorMessage` - it will be `nonEmpty` from `cancelRunningIndependentStages` and so trigger failure paths.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409325407


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1004,6 +1004,17 @@ private[spark] class TaskSetManager(
     maybeFinishTaskSet()
   }
 
+  // Suspends this TSM to avoid launching new tasks.
+  //
+  // Unlike `abort()`, this function intentionally to not notify DAGScheduler to avoid
+  // redundant operations. So the invocation to this function should assume DAGScheduler

Review Comment:
   It's not expensive. The operation ("abort stage") is a noop in the end as I mentioned at https://github.com/apache/spark/pull/43954/files#r1402869071. I want to remove "abort stage" because I think it's not a right behaviour. "abort stage" always means any active jobs that depends on it needs to fail. So it doesn't make sense to me, for example, when a result stage succeeds and the job succeeds, but in turn we needs to cancel straggle running tasks in that result stage and abort that stage. The "abort" here will try to fail the job (which already succeeds) but just doesn't happen today because DAGScheduler is thread-safe and the succeeded job have been removed from the acive job list.
   
   If we want to be conservative, I'm fine to keep as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1448203237


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -54,9 +54,9 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage.
+  // Kill all the tasks in all the stage attempts of the same stage Id
   // Throw UnsupportedOperationException if the backend doesn't support kill tasks.
-  def cancelTasks(stageId: Int, interruptThread: Boolean, reason: String): Unit
+  def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit

Review Comment:
   super nit: shall we put this method after `def killTaskAttempt`? The same for the testing `TaskScheduler` implementations to reduce the code diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1401687380


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1871,21 +1871,6 @@ private[spark] class DAGScheduler(
                     markStageAsFinished(resultStage)
                     cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
-                    try {
-                      // killAllTaskAttempts will fail if a SchedulerBackend does not implement
-                      // killTask.
-                      logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
-                        "or zombie tasks for this job")
-                      // ResultStage is only used by this job. It's safe to kill speculative or
-                      // zombie tasks in this stage.
-                      taskScheduler.killAllTaskAttempts(

Review Comment:
   `cancelRunningIndependentStages` should already kill all the tasks of this stage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1432400008


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -840,6 +856,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(failure.getMessage === "Job aborted due to stage failure: some failure")
     assert(sparkListener.failedStages === Seq(0))
     assertDataStructuresEmpty()
+    Thread.sleep(3000)

Review Comment:
   Oops..test lines. Will remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1864073899

   > Are we not testing when legacyAbortAfterCancelTasks == false ?
   
   Like I mentioned at https://github.com/apache/spark/pull/43954#discussion_r1420598351, since the conf is static in Spark Core, we need to add a separate suite to test the `false` value for all the tests. It's probably still worth though. Let me add it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409052716


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1894,24 +1894,8 @@ private[spark] class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    markStageAsFinished(resultStage)

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409276985


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1004,6 +1004,17 @@ private[spark] class TaskSetManager(
     maybeFinishTaskSet()
   }
 
+  // Suspends this TSM to avoid launching new tasks.
+  //
+  // Unlike `abort()`, this function intentionally to not notify DAGScheduler to avoid
+  // redundant operations. So the invocation to this function should assume DAGScheduler

Review Comment:
   So how expensive is the redundant operation? We may choose to always do them if it's cheap, to simplify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409491574


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2860,6 +2844,11 @@ private[spark] class DAGScheduler(
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
               taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job), reason)
+              if (sc.getConf.get(LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS)) {

Review Comment:
   Pull this out as a `field`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1838517185

   @mridulm Thanks for the detailed comment.
   
   > Additional call to `suspend` for existing `killAllTaskAttempts`
   
   Note that we always call `markStageAsFinished` after the call to `killAllTaskAttempts`. So I think we don't expect to launch new tasks for such stage. And `suspend`, especially "mark as zombie", excatly avoids that. I think this's actually a bug that we need to fix.
   
   
   > Lack of TaskSetFailed for existing cancelTasks (which could be impacting use from abortStage, job cancellation, etc)
   
   `TaskSetFailed` leads to `abortStage`. And I think the essential (main) effect of `abortStage` is to fail the active jobs which depend on the stage. And I have the analysis [here](https://github.com/apache/spark/pull/43954#discussion_r1402869071) to explain why I think `abortStage` is a noop at the end during the call of `cancelTasks`. Could you take a look there?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1853299046

   Ah, interesting - I had not looked at barrier stage in as much detail; my initial observation was it worked fine, but you are right - this does break the assumption.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1402597413


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2179,12 +2164,12 @@ private[spark] class DAGScheduler(
           val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
             failure.toErrorString
           try {
-            // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
+            // cancelTasks will fail if a SchedulerBackend does not implement killTask.
             val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
               "failed."
             val job = jobIdToActiveJob.get(failedStage.firstJobId)
             val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
-            taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)

Review Comment:
   I think that there is a subtle difference here: `killAllTaskAttempts` only kills tasks, whereas `cancelTasks` also calls `tsm.abort()` on the stage attempts, which might enqueue a new `taskSetFailed` event for each task set and I think that could have unintended side effects. Can you double-check whether we think that change is okay?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408807471


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1871,21 +1871,6 @@ private[spark] class DAGScheduler(
                     markStageAsFinished(resultStage)
                     cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
-                    try {
-                      // killAllTaskAttempts will fail if a SchedulerBackend does not implement
-                      // killTask.
-                      logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
-                        "or zombie tasks for this job")
-                      // ResultStage is only used by this job. It's safe to kill speculative or
-                      // zombie tasks in this stage.
-                      taskScheduler.killAllTaskAttempts(

Review Comment:
   It turns out that we should switch the order beween `markStageAsFinished` and `cancelRunningIndependentStages` after removing this code block. `markStageAsFinished` removes the stage from `runningStages` and `cancelRunningIndependentStages` only cleanup stages that exist in `runningStages`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408964711


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -1671,37 +1671,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     assert(taskScheduler.taskSetManagerForAttempt(0, 0).isEmpty)
   }
 
-  test("killAllTaskAttempts shall kill all the running tasks and not fail the stage") {
-    val taskScheduler = setupScheduler()
-
-    taskScheduler.initialize(new FakeSchedulerBackend {
-      override def killTask(
-          taskId: Long,
-          executorId: String,
-          interruptThread: Boolean,
-          reason: String): Unit = {
-        // Since we only submit one stage attempt, the following call is sufficient to mark the
-        // task as killed.
-        taskScheduler.taskSetManagerForAttempt(0, 0).get.runningTasksSet.remove(taskId)
-      }
-    })
-
-    val attempt1 = FakeTask.createTaskSet(10)
-    taskScheduler.submitTasks(attempt1)
-
-    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
-      new WorkerOffer("executor1", "host1", 1))
-    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
-    assert(2 === taskDescriptions.length)
-    val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
-    assert(2 === tsm.runningTasks)
-
-    taskScheduler.killAllTaskAttempts(0, false, "test")

Review Comment:
   I see. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408624999


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -54,7 +54,7 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage.
+  // Kill all the tasks in all the stage attempts of the same stage Id

Review Comment:
   I considered to metion zombie here but gave up finally as think that it's only an API and we don't we don't enforce the same functionaltiy for every implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408664994


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -54,7 +54,7 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage.
+  // Kill all the tasks in all the stage attempts of the same stage Id

Review Comment:
   Sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408581685


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2603,4 +2603,13 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val ABORT_STAGE_AFTER_CANCEL_TASKS =
+    ConfigBuilder("spark.scheduler.stage.abortStageAfterCancelTasks")

Review Comment:
   is this kind of a legacy config to restore a legacy behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409488053


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1871,21 +1871,6 @@ private[spark] class DAGScheduler(
                     markStageAsFinished(resultStage)
                     cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
-                    try {
-                      // killAllTaskAttempts will fail if a SchedulerBackend does not implement
-                      // killTask.
-                      logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
-                        "or zombie tasks for this job")
-                      // ResultStage is only used by this job. It's safe to kill speculative or
-                      // zombie tasks in this stage.
-                      taskScheduler.killAllTaskAttempts(

Review Comment:
   See my [comment above](https://github.com/apache/spark/pull/43954/files#r1409479495), unless we make other changes appropriately, we will have to revert this - as the tasks wont be killed in `cancelRunningIndependentStages` as it wont be a running stage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1420598351


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2603,4 +2603,13 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS =
+    ConfigBuilder("spark.legacy.scheduler.stage.abortAfterCancelTasks")

Review Comment:
   > Btw, we should flip this switch on and off in the relevant tests to check if the behavior is preserved.
   
   It is a bit annoying to flip this conf switch on and off only for the relevant unit tests in `DAGSchedulerSuite` since it uses a global `SparkContext`.  One way to do this is to might intrduce a new entire suite called `DAGSchedulerWithAbortStageDisabledSuite` but not sure it's worth to do that. Or we could extract the relevant unit tests into a separate suite and then flip the conf with two on- and off- suites. a bit complicated though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1432407891


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -54,12 +54,30 @@ import org.apache.spark.util.ArrayImplicits._
 class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
   extends DAGSchedulerEventProcessLoop(dagScheduler) {
 
+  dagScheduler.setEventProcessLoop(this)

Review Comment:
   Right. Before this change, the the event process loop are derived. There are two event process loops involved in the testing DAGScheduler (i.e., `MyDAGScheduler`). One is the manually created - `DAGSchedulerEventProcessLoopTester` and another one is the default `DAGSchedulerEventProcessLoop` that created inside `MyDAGScheduler`'s base class `DAGScheduler.` And any explicitly created events that posted during test using `DAGSchedulerEventProcessLoopTester`. And other implictly created events (inside `DAGScheduler`) are posted by `DAGSchedulerEventProcessLoop`. This leads to the events being processed by two separate threads so the "first to complete before the second" policy can no longer be guaranteed. This change ensures we only use one event process loop during the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1445733313


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -54,7 +54,7 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage.
+  // Kill all the tasks in all the stage attempts of the same stage Id

Review Comment:
   Looking at the comment, shall we keep `killAllTaskAttempts` instead of `cancelTasks`, as the naming of `killAllTaskAttempts` fits the comment better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1410066008


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1894,24 +1894,8 @@ private[spark] class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    markStageAsFinished(resultStage)

Review Comment:
   @mridulm Oh I see. Thanks for catching this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409488053


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1871,21 +1871,6 @@ private[spark] class DAGScheduler(
                     markStageAsFinished(resultStage)
                     cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
-                    try {
-                      // killAllTaskAttempts will fail if a SchedulerBackend does not implement
-                      // killTask.
-                      logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
-                        "or zombie tasks for this job")
-                      // ResultStage is only used by this job. It's safe to kill speculative or
-                      // zombie tasks in this stage.
-                      taskScheduler.killAllTaskAttempts(

Review Comment:
   See my [comment above](https://github.com/apache/spark/pull/43954/files#r1409479495), unless we have other changes, we will have to revert this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1421803481


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2603,4 +2603,14 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS =
+    ConfigBuilder("spark.scheduler.stage.legacyAbortAfterCancelTasks")
+      .doc("Whether to abort a stage after TaskScheduler.cancelTasks(). This is used to restore " +
+        "the original behavior in case there are any regressions after abort stage is removed " +
+        "from TaskScheduler.cancelTasks()")
+      .version("4.0.0")
+      .internal()
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   hmm, isn't 4.0 a good chance to enable this change? I don't think 4.1 will be a better chance than 4.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #43954: [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts
URL: https://github.com/apache/spark/pull/43954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1448451079


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -54,9 +54,9 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage.
+  // Kill all the tasks in all the stage attempts of the same stage Id
   // Throw UnsupportedOperationException if the backend doesn't support kill tasks.
-  def cancelTasks(stageId: Int, interruptThread: Boolean, reason: String): Unit
+  def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit

Review Comment:
   Updated, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1448192078


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -54,7 +54,7 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage.
+  // Kill all the tasks in all the stage attempts of the same stage Id

Review Comment:
   Updated, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408582228


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2205,12 +2190,12 @@ private[spark] class DAGScheduler(
           val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
             failure.toErrorString
           try {
-            // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
+            // cancelTasks will fail if a SchedulerBackend does not implement killTask.
             val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
               "failed."
             val job = jobIdToActiveJob.get(failedStage.firstJobId)
             val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
-            taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)
+            taskScheduler.cancelTasks(stageId, shouldInterrupt, reason)

Review Comment:
   shall we also abort the stage here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1402869071


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2179,12 +2164,12 @@ private[spark] class DAGScheduler(
           val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
             failure.toErrorString
           try {
-            // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
+            // cancelTasks will fail if a SchedulerBackend does not implement killTask.
             val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
               "failed."
             val job = jobIdToActiveJob.get(failedStage.firstJobId)
             val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
-            taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)

Review Comment:
   This's really a good point. `taskSetFailed` will abort the stage and in turn fails the whole job, which is not the intended  behaviour here. The problem of `killAllTaskAttempts` is that it doesn't mark the `TaskSetManager` as zombie after killing all the tasks. So the `TaskSetManager` could still launch new tasks (by retry), which is not expected.
   
   But I'm also thinking do we really want to abort the stages in `cancelTasks`? `cancelTasks` is currently called inside `cancelRunningIndependentStages` only. And `cancelRunningIndependentStages` is directly or indirectly called in 3 cases:
   
   * When a job successfully finished: in this case, we expect that all the stages in this job can release the computation resources (i.e., kill all the tasks via `cleanupStateForJobAndIndependentStages`) immediately. But I think we don't expect this "release" action would lead to the stage abortion and in turn fail the job in the end. It doesn't fail the already succeeded job today because the succeeded job has been clean up (no longer exists in the `activeJobs` list) when the `taskSetFailed` event comes.
    
   * When a job is requested to cancel: this case is essentially the same with the above case but only the job finishes in different states.
   
   * When a stage aborts: in this case, we expect all the active jobs which depends on this stage to be canceled. Thus, we need to call `cancelRunningIndependentStages` on each active job. And this would finally fallback to the first case as the active job will be cleaned up (via `cleanupStateForJobAndIndependentStages`) first before the  `taskSetFailed` event comes.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1842129215

   Thanks for the details @Ngone51, sorry for the delay in going over this - your explaination makes sense to me.
   Can you update the PR description about the fact that we are also fixing a bug through the change with `suspend` ?
   
   Will do a quick code review as well, have already left some comments - will do a quick pass over the rest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1843019152

   Thanks @mridulm . Will address your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1448181728


##########
core/src/test/scala/org/apache/spark/TempLocalSparkContext.scala:
##########
@@ -51,7 +51,7 @@ trait TempLocalSparkContext extends BeforeAndAfterEach
    */
   def sc: SparkContext = {
     if (_sc == null) {
-      _sc = new SparkContext(_conf)
+      _sc = new SparkContext(conf)

Review Comment:
   No. `conf` allow us to use the custom conf from inherit classes.  But `_conf`  are hard-coded to `defaultSparkConf`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1432407891


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -54,12 +54,30 @@ import org.apache.spark.util.ArrayImplicits._
 class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
   extends DAGSchedulerEventProcessLoop(dagScheduler) {
 
+  dagScheduler.setEventProcessLoop(this)

Review Comment:
   Right. Before this change, the the event process loop are derived. There are two event process loops involved in the testing DAGScheduler (i.e., `MyDAGScheduler`). One is the manually created - `DAGSchedulerEventProcessLoopTester` and another one is the default `DAGSchedulerEventProcessLoop` that created inside `MyDAGScheduler`'s base class `DAGScheduler.` And any explicitly created events that posted during test use `DAGSchedulerEventProcessLoopTester`. And other implictly created events (inside `DAGScheduler`) are posted by `DAGSchedulerEventProcessLoop`. This leads to the events being processed by two separate threads so the "first to complete before the second" policy can no longer be guaranteed. This change ensures we only use one event process loop during the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1411502671


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2603,4 +2603,13 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS =
+    ConfigBuilder("spark.legacy.scheduler.stage.abortAfterCancelTasks")

Review Comment:
   Namespace it below `scheduler` ? Something like `spark.scheduler.stage.legacyAbortAfterCancelTasks` ...
   Also, mark it as `internal` ?
   
   Btw, we should flip this switch on and off in the relevant tests to check if the behavior is preserved.



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2603,4 +2603,13 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS =
+    ConfigBuilder("spark.legacy.scheduler.stage.abortAfterCancelTasks")
+      .doc("Whether to abort a stage after TaskScheduler.cancelTasks(). This is used to restore " +
+        "the original behavior in case there are any regressions after abort stage is removed " +
+        "from TaskScheduler.cancelTasks()")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(false)

Review Comment:
   Do we want to default to `true` and switch to `false` in a later ver ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408583579


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -296,18 +296,32 @@ private[spark] class TaskSchedulerImpl(
     new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
   }
 
+  // Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't
+  // be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up
+  // once all the tasks has been finished. The stage attempt could be aborted after the call of

Review Comment:
   I'm trying to understand the rationale here. What's the key difference between marking as zombie and aborting the stage? Is it for the still-running tasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408649384


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##########
@@ -296,18 +296,32 @@ private[spark] class TaskSchedulerImpl(
     new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
   }
 
+  // Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't
+  // be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up
+  // once all the tasks has been finished. The stage attempt could be aborted after the call of

Review Comment:
   ```
   def abort(message: String, exception: Option[Throwable] = None): Unit = sched.synchronized {
     sched.dagScheduler.taskSetFailed(taskSet, message, exception)
     isZombie = true
     maybeFinishTaskSet()
   }
   ```
   
   When there is a call to abort, the TSM must be marked as zombie. So the key difference should come from `dagScheduler.taskSetFailed`. `dagScheduler.taskSetFailed` essentially cleans up the data related to this stage and fail the jobs which depends on this stage.
   
   
   
   There's no difference to TSM between zombie and abort. Tasks in TSM can still run until finishes (whether killed or succeeded).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408809625


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1871,21 +1871,6 @@ private[spark] class DAGScheduler(
                     markStageAsFinished(resultStage)
                     cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
-                    try {
-                      // killAllTaskAttempts will fail if a SchedulerBackend does not implement
-                      // killTask.
-                      logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
-                        "or zombie tasks for this job")
-                      // ResultStage is only used by this job. It's safe to kill speculative or
-                      // zombie tasks in this stage.
-                      taskScheduler.killAllTaskAttempts(

Review Comment:
   Furthermore we can also remove `markStageAsFinished` here because `cancelRunningIndependentStages` already does that for each stage after the cleanup:
   
   https://github.com/apache/spark/blob/14d854beeabcf872175edfd65cb6475ecb7d0ae7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2862-L2863
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1408807471


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1871,21 +1871,6 @@ private[spark] class DAGScheduler(
                     markStageAsFinished(resultStage)
                     cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
-                    try {
-                      // killAllTaskAttempts will fail if a SchedulerBackend does not implement
-                      // killTask.
-                      logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
-                        "or zombie tasks for this job")
-                      // ResultStage is only used by this job. It's safe to kill speculative or
-                      // zombie tasks in this stage.
-                      taskScheduler.killAllTaskAttempts(

Review Comment:
   It turns out that we should switch the order beween `markStageAsFinished` and `cancelRunningIndependentStages` after removing this code block.  Because `markStageAsFinished` removes the stage from `runningStages` but `cancelRunningIndependentStages` only cleanup stages that exist in `runningStages`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1409479495


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1894,24 +1894,8 @@ private[spark] class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    markStageAsFinished(resultStage)

Review Comment:
   They are not the same @Ngone51 - `markStageAsFinished` is for successful stage completions, while `cancelRunningIndependentStages` is aborting other stages for the job which now need to be killed due to the job successfully terminating.
   
   One impact of this is in the `errorMessage` - it will be `nonEmpty` from `cancelRunningIndependentStages` and so trigger failure paths.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43954:
URL: https://github.com/apache/spark/pull/43954#issuecomment-1888717719

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org