You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/18 09:41:22 UTC

[GitHub] [spark] toujours33 opened a new pull request, #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

toujours33 opened a new pull request, #38711:
URL: https://github.com/apache/spark/pull/38711

   ### What changes were proposed in this pull request?
   ExecutorAllocationManager only record count for speculative task, `stageAttemptToNumSpeculativeTasks` increment when speculative task submit, and only decrement when speculative task end.
   If task finished before speculative task start, the speculative task will never be scheduled, which will cause leak of `stageAttemptToNumSpeculativeTasks` and mislead the calculation of target executors.
   
   This PR fixes the issue by add task index in `SparkListenerSpeculativeTaskSubmitted` event, and record speculative task with task index, when task finished, the speculative task will also decrement.
   
   ### Why are the changes needed?
   To fix idle executors caused by pending speculative task from task that has been finished
   
   ### Does this PR introduce _any_ user-facing change?
   DeveloperApi `SparkListenerSpeculativeTaskSubmitted` add taskIndex with default value -1
   
   ### How was this patch tested?
   Add a comprehensive unit test.
   Pass the GA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034403916


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-14492): We simply need an Int for this.
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+    // Number of speculative tasks pending in each stageAttempt
+    private val stageAttemptToUnsubmittedSpeculativeTasks =

Review Comment:
   btw, `stageAttemptToPendingSpeculativeTasks` is a good idea to replace `stageAttemptToUnsubmittedSpeculativeTasks`, I'll fix it soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1036003754


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   I constructed a case with shuffle retry and logged `taskIndex` and `partitionId` recorded in task events `SparkListenerTaskStart` & `SparkListenerTaskEnd`.
   When stage submitted normally(Not a retry stage or more broadly, not a partial task submission), `taskIndex` is equal to `partitionId`:
   <img width="1284" alt="image" src="https://user-images.githubusercontent.com/20420642/204814905-59625694-2e37-4484-8b10-522af8f0d61e.png">
   But when shuffle stage resubmitted with partial tasks, `taskIndex` is not necessarily equal to `partitionId`:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1329974305

   ping @mridulm Could you help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028830007


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-            new mutable.HashSet[Int]) += taskIndex
+          stageAttemptToSpeculativeTaskIndices
+            .getOrElseUpdate(stageAttempt, new mutable.HashSet[Int]).add(taskIndex)
+          stageAttemptToUnsubmittedSpeculativeTasks

Review Comment:
   I think it's ok here.
   As PR describes, `stageAttemptToUnsubmittedSpeculativeTasks.remove(taskIndex)` should be called only when speculative task start or task finished(whether it is speculative or not). 
   Line 754 will do nothing if this speculative task has been removed when task finished, which is expected and will be ok.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034252202


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,11 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()

Review Comment:
   Note: Ideally, we simply need an `Int` for this - since we have split `stageAttemptToUnsubmittedSpeculativeTasks`.
   For now, let us keep it as a HashSet though - I have slight concerns around dropped events which might make the map's go inconsistent.



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -722,9 +723,8 @@ private[spark] class ExecutorAllocationManager(
         // because the attempt may still have running tasks,
         // even after another attempt for the stage is submitted.
         stageAttemptToNumTasks -= stageAttempt
-        stageAttemptToNumSpeculativeTasks -= stageAttempt
+        stageAttemptToUnsubmittedSpeculativeTasks -= stageAttempt

Review Comment:
   Remove from both maps.



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -896,9 +897,8 @@ private[spark] class ExecutorAllocationManager(
     }
 
     private def getPendingSpeculativeTaskSum(attempt: StageAttempt): Int = {
-      val numTotalTasks = stageAttemptToNumSpeculativeTasks.getOrElse(attempt, 0)
-      val numRunning = stageAttemptToSpeculativeTaskIndices.get(attempt).map(_.size).getOrElse(0)
-      numTotalTasks - numRunning
+      stageAttemptToUnsubmittedSpeculativeTasks
+        .getOrElse(attempt, mutable.HashSet.empty[Int]).size

Review Comment:
   nit:
   ```suggestion
         stageAttemptToUnsubmittedSpeculativeTasks.get(attempt).map(_.size).getOrElse(0)
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   Why not simply use `task.partitionId` instead for `taskIndex` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034274241


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,11 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()

Review Comment:
   I will add a todo to mark this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034403079


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-14492): We simply need an Int for this.
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+    // Number of speculative tasks pending in each stageAttempt
+    private val stageAttemptToUnsubmittedSpeculativeTasks =

Review Comment:
   `stageAttemptToSpeculativeTaskIndices` keeps the same meaning as `stageAttemptToTaskIndicies`, I would prefer to keep it~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1322147827

   > Seems need to update pr description @toujours33
   
   Thanks for reminding~, updated just now @LuciferYang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028788976


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -774,17 +776,16 @@ private[spark] class ExecutorAllocationManager(
             removeStageFromResourceProfileIfUnused(stageAttempt)
           }
         }
+

Review Comment:
   Done~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042987205


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-41192): We simply need an Int for this.
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+    // Number of speculative tasks pending in each stageAttempt

Review Comment:
   ```suggestion
       // Map from each stageAttempt to a set of pending speculative task indexes
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1322379454

   We need `stageAttemptToSpeculativeTaskIndices` for `removeStageFromResourceProfileIfUnused`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1038740740


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   We are trying to identify if two tasks for a stage attempt are computing the same partition or not.
   Whether this is using partitionId or taskIndex, they are effectively the same - yes, the values wont match when a subset of partitions are computed in a stage attempt - but that is expected.
   
   Having said that, my intention was to minimize change to external api - not necessarily internal impl.
   Looks like we are not able to do that ... any thoughts @Ngone51 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1037744591


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Fix like this:
   <img width="856" alt="image" src="https://user-images.githubusercontent.com/20420642/205205060-071827bf-bba2-47bc-b98d-7c778e00820d.png">
   <img width="878" alt="image" src="https://user-images.githubusercontent.com/20420642/205205206-fecb2fe3-bbc7-49e0-bf60-af5cc3a09f98.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1037085777


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-41192): We simply need an Int for this.
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()

Review Comment:
   Any reason for adding a bracket to line 649?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1321647910

   > I will need to think more about this issue, but I am leaning towards a variant of the solution proposed.
   
   Thanks for your reviewing,`stageAttemptToUnsubmittedSpeculativeTasks` is more appropriate in meaning. I will try to fix it later in next commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1360815556

   Merged to master.
   Thanks for working on this @toujours33 !
   Thanks for the reviews @LuciferYang, @Ngone51, @dongjoon-hyun :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042987003


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt

Review Comment:
   ```suggestion
       // Map from each stageAttempt to a set of running speculative task indexes
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1043030519


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt

Review Comment:
   Got it, fix soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1040337253


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   +1 to @mridulm 's option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034397331


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-14492): We simply need an Int for this.

Review Comment:
   Sorry, typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1035806206


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   I'll check later if it makes a difference when use `partitionId` instead of `taskIndex`.
   Btw~ even if we use the `partitionId` instead, we can minimize the change of `SpeculativeTaskSubmitted` in `DAGSchedulerEvent`.  But the change to developer api `SparkListenerSpeculativeTaskSubmitted` is unavoidable, for `SparkListenerSpeculativeTaskSubmitted` now just take `stageId` and `stageAttemptId` as arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1049388491


##########
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala:
##########
@@ -671,6 +681,83 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     onExecutorRemoved(manager, "5")
   }
 
+  test("SPARK-41192: remove executors when task finished before speculative task scheduled") {
+    val clock = new ManualClock()
+    val stage = createStageInfo(0, 40)
+    val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4)
+    val manager = createManager(conf, clock = clock)
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    // submit 40 tasks, total executors needed = 40/4 = 10
+    post(SparkListenerStageSubmitted(stage))
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 3)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+
+    (0 to 9).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
+    (0 to 39).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {

Review Comment:
   super nit: `0 until 10`, `0 until 40`, etc in this method - so that the bound matches what we have configured (without needing to correlate with `end - 1` : help minimize errors in future as code evolves).



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {

Review Comment:
   We dont need the default value.
   
   ```suggestion
     def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1048446891


##########
project/MimaExcludes.scala:
##########
@@ -122,6 +122,13 @@ object MimaExcludes {
     // [SPARK-41072][SS] Add the error class STREAM_FAILED to StreamingQueryException
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
 
+
+    // [SPARK-41192] add taskIndex in SparkListenerSpeculativeTaskSubmitted Event
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.apply"),

Review Comment:
   Fixed ~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1045549245


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1178,8 +1178,13 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
-    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
+    val speculativeTaskSubmittedEvent =
+      SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)
+    // add taskIndex field for Executor Dynamic Allocation
+    speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
+    speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
+    listenerBus.post(speculativeTaskSubmittedEvent)

Review Comment:
   Sounds better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028807362


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-            new mutable.HashSet[Int]) += taskIndex
+          stageAttemptToSpeculativeTaskIndices

Review Comment:
   `allocationManager.synchronized` will be called to protect this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1039084796


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   Maybe we can follow the suggestion pulled by @LuciferYang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1043000058


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   `Is it a lot of change to add support for it ?`
   If you just want to add partitionId as a field of taskEvent, the change will not be too much. The question is why do we need `partitionId` in `ExecutorAllocationManager`?
   `I would want to keep both of them in sync, and prevent divergence`
   But if you want to keep `partitionId` and `taskIndex` in sync or use `partitionId` instead of `taskIndex`,  the changes will be huge, and I don’t think it’s appropriate to make changes here~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1036003754


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   I constructed a case with shuffle retry and logged `taskIndex` and `partitionId` recorded in task events `SparkListenerTaskStart` & `SparkListenerTaskEnd`.
   When stage submitted normally(Not a retry stage or more broadly, not a partial task submission), `taskIndex` is equal to `partitionId`:
   <img width="1284" alt="image" src="https://user-images.githubusercontent.com/20420642/204814905-59625694-2e37-4484-8b10-522af8f0d61e.png">
   But when shuffle stage resubmitted with partial tasks, `taskIndex` is not necessarily equal to `partitionId`:
   <img width="1278" alt="image" src="https://user-images.githubusercontent.com/20420642/204815345-eef0eada-62b3-4e7c-b51d-1e9748e2235c.png">
   
   So if we use `partitionId` instead of `taskIndex` for `SpeculativeTaskSubmitted `, we won't release pending speculative tasks which the normal task is finished before starting the speculative task  and mislead the calculation of target executors, for `taskIndex` may not equal to `partitionId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1044148832


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1178,8 +1178,13 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
-    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
+    val speculativeTaskSubmittedEvent =
+      SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)
+    // add taskIndex field for Executor Dynamic Allocation
+    speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
+    speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
+    listenerBus.post(speculativeTaskSubmittedEvent)

Review Comment:
   How about this:
   ```
   case class SparkListenerSpeculativeTaskSubmitted(
       stageId: Int,
       stageAttemptId: Int = 0)
     extends SparkListenerEvent {
     // Note: this is here for backwards-compatibility with older versions of this event which
     // didn't stored taskIndex
     private var _taskIndex: Int = -1
     private var _partitionId: Int = -1
   
     def taskIndex: Int = _taskIndex
     def partitionId: Int = _partitionId
   
     def this(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: Int) = {
       this(stageId, stageAttemptId)
       _partitionId = partitionId
       _taskIndex = taskIndex
     }
   }
   ```
   
   We can constrict with `taskIndex` and `partitionId`.
   ```
   val speculativeTaskSubmittedEvent = new SparkListenerSpeculativeTaskSubmitted(
         task.stageId, task.stageAttemptId, taskIndex, task.partitionId)
   ```
   
   default usage as
   ```
   val speculativeTaskSubmittedEvent  = speculativeTaskSubmittedEvent(stageId, stageAttempId)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042993398


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -733,7 +735,7 @@ private[spark] class ExecutorAllocationManager(
 
         // If this is the last stage with pending tasks, mark the scheduler queue as empty
         // This is needed in case the stage is aborted for any reason
-        if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) {
+        if (stageAttemptToNumTasks.isEmpty && stageAttemptToPendingSpeculativeTasks.isEmpty) {

Review Comment:
   Should it be:
   ```suggestion
           if (stageAttemptToNumTasks.isEmpty && stageAttemptToPendingSpeculativeTasks.isEmpty && stageAttemptToSpeculativeTaskIndices.isEmpty) {
   ```
   
   `stageAttemptToNumSpeculativeTasks` used to include both pending/running speculative tasks but not it's splitted.



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -733,7 +735,7 @@ private[spark] class ExecutorAllocationManager(
 
         // If this is the last stage with pending tasks, mark the scheduler queue as empty
         // This is needed in case the stage is aborted for any reason
-        if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) {
+        if (stageAttemptToNumTasks.isEmpty && stageAttemptToPendingSpeculativeTasks.isEmpty) {

Review Comment:
   Should it be:
   ```suggestion
           if (stageAttemptToNumTasks.isEmpty && stageAttemptToPendingSpeculativeTasks.isEmpty && stageAttemptToSpeculativeTaskIndices.isEmpty) {
   ```
   
   `stageAttemptToNumSpeculativeTasks` used to include both pending/running speculative tasks but now it's splitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1044062231


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1178,8 +1178,13 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
-    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
+    val speculativeTaskSubmittedEvent =
+      SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)
+    // add taskIndex field for Executor Dynamic Allocation
+    speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
+    speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
+    listenerBus.post(speculativeTaskSubmittedEvent)

Review Comment:
   How about adding a new apply() to `SparkListenerSpeculativeTaskSubmitted` to simplify the construction? e.g.,
   ```scala
   object SparkListenerSpeculativeTaskSubmitted {
     def apply(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: Int)
       : SparkListenerSpeculativeTaskSubmitted = {
       val speculativeTaskSubmittedEvent =
         SparkListenerSpeculativeTaskSubmitted(stageId, stageAttemptId)
       // add taskIndex field for Executor Dynamic Allocation
       speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
       speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
       speculativeTaskSubmittedEvent
     }
   ```
   so here we can construct the `SparkListenerSpeculativeTaskSubmitted` like `      SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId, taskIndex, task.partitionId)
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034273628


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -722,9 +723,8 @@ private[spark] class ExecutorAllocationManager(
         // because the attempt may still have running tasks,
         // even after another attempt for the stage is submitted.
         stageAttemptToNumTasks -= stageAttempt
-        stageAttemptToNumSpeculativeTasks -= stageAttempt
+        stageAttemptToUnsubmittedSpeculativeTasks -= stageAttempt

Review Comment:
   Got



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034252202


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,11 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()

Review Comment:
   Review Note: Ideally, we simply need an `Int` for this - since we have split `stageAttemptToUnsubmittedSpeculativeTasks`.
   For now, let us keep it as a HashSet though - I have slight concerns around dropped events which might make the map's go inconsistent.



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,11 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()

Review Comment:
   Review Note: Ideally, we simply need an `Int` for this - since we have split `stageAttemptToUnsubmittedSpeculativeTasks`.
   For now, let us keep it as a `HashSet` though - I have slight concerns around dropped events which might make the map's go inconsistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1320030900

   cc @mridulm @Ngone51 FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1320213339

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1321576092

   Thanks for the PR to fix this bug !
   I will need to think more about this issue, but I am leaning towards a variant of the solution proposed.
   Namely:
   
   * `stageAttemptToNumSpeculativeTasks` -> `stageAttemptToUnsubmittedSpeculativeTasks`
     * These are task indices which have been marked speculatable, which have not yet been launched.
   * When a speculative task starts, remove it from `stageAttemptToUnsubmittedSpeculativeTasks`
   * When a non-speculative task finishes, remove it from `stageAttemptToUnsubmittedSpeculativeTasks`
   
   Thoughts @Ngone51, @toujours33, @LuciferYang ?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028803675


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-            new mutable.HashSet[Int]) += taskIndex
+          stageAttemptToSpeculativeTaskIndices
+            .getOrElseUpdate(stageAttempt, new mutable.HashSet[Int]).add(taskIndex)
+          stageAttemptToUnsubmittedSpeculativeTasks

Review Comment:
   if use `new mutable.HashSet[Int]`, Is it necessary to call remove?
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028823929


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-            new mutable.HashSet[Int]) += taskIndex
+          stageAttemptToSpeculativeTaskIndices
+            .getOrElseUpdate(stageAttempt, new mutable.HashSet[Int]).add(taskIndex)
+          stageAttemptToUnsubmittedSpeculativeTasks

Review Comment:
   My concern is that if line754 do nothing, this should be regarded as an wrong state? Does this wrong state cause other issue?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034375202


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -810,9 +812,10 @@ private[spark] class ExecutorAllocationManager(
       val stageId = speculativeTask.stageId
       val stageAttemptId = speculativeTask.stageAttemptId
       val stageAttempt = StageAttempt(stageId, stageAttemptId)
+      val taskIndex = speculativeTask.taskIndex
       allocationManager.synchronized {
-        stageAttemptToNumSpeculativeTasks(stageAttempt) =
-          stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
+        stageAttemptToUnsubmittedSpeculativeTasks.getOrElseUpdate(stageAttempt,

Review Comment:
   "Unsubmitted" here is confusing since the method name here is `onSpeculativeTaskSubmitted`.. how about `stageAttemptToPendingSpeculativeTasks`?



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -775,16 +779,14 @@ private[spark] class ExecutorAllocationManager(
           }
         }
         if (taskEnd.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
-          // If the previous task attempt succeeded first and it was the last task in a stage,
-          // the stage may have been removed before handing this speculative TaskEnd event.
-          if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
-            stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
-          }
+          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach(_.remove(taskIndex))
         }
 
         taskEnd.reason match {
-          case Success | _: TaskKilled =>
+          case Success =>
+            // remove speculative task for task finished.

Review Comment:
   ```suggestion
               // Remove pending speculative task in case the normal task is finished before starting the speculative task
   ```



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-14492): We simply need an Int for this.

Review Comment:
   SPARK-41192?



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-14492): We simply need an Int for this.
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+    // Number of speculative tasks pending in each stageAttempt
+    private val stageAttemptToUnsubmittedSpeculativeTasks =

Review Comment:
   How about renaming to `stageAttemptToPendingSpeculativeTasks` and renaming `stageAttemptToSpeculativeTaskIndices` to `stageAttemptToRunningSpeculativeTaskIndices`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1037142914


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -843,9 +847,9 @@ private[spark] class ExecutorAllocationManager(
     def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt): Unit = {
       if (!stageAttemptToNumRunningTask.contains(stageAttempt) &&
           !stageAttemptToNumTasks.contains(stageAttempt) &&
-          !stageAttemptToNumSpeculativeTasks.contains(stageAttempt) &&
-          !stageAttemptToTaskIndices.contains(stageAttempt) &&
-          !stageAttemptToSpeculativeTaskIndices.contains(stageAttempt)
+          !stageAttemptToSpeculativeTaskIndices.contains(stageAttempt) &&

Review Comment:
   hmm... why does `!stageAttemptToSpeculativeTaskIndices.contains(stageAttempt)` move from after `!stageAttemptToTaskIndices.contains(stageAttempt)` to before  `!stageAttemptToTaskIndices.contains(stageAttempt)`?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1037869060


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   If you want `taskIndex` to be a `val`, maybe we can consider using a size 1 `Array` as an container, and add two function to update and get the Array(0) as `taskIndex`



##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   If we want `taskIndex` to be a `val`, maybe we can consider using a size 1 `Array` as an container, and add two function to update and get the Array(0) as `taskIndex`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1037744591


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Fix like this:
   <img width="856" alt="image" src="https://user-images.githubusercontent.com/20420642/205205060-071827bf-bba2-47bc-b98d-7c778e00820d.png">
   <img width="878" alt="image" src="https://user-images.githubusercontent.com/20420642/205205206-fecb2fe3-bbc7-49e0-bf60-af5cc3a09f98.png">
   cc - @mridulm @Ngone51 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1039084796


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   Maybe we can refer to @LuciferYang 's suggestion: add field in `SparkListenerSpeculativeTaskSubmitted` to minimize api change. @mridulm cc - @Ngone51 
   https://github.com/apache/spark/pull/38711#discussion_r1037165963



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028802840


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-            new mutable.HashSet[Int]) += taskIndex
+          stageAttemptToSpeculativeTaskIndices

Review Comment:
   Is there a concurrent access issue? `mutable.HashMap` is not thread safe
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1322027575

   Seems need update pr description @toujours33 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028804426


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-            new mutable.HashSet[Int]) += taskIndex
+          stageAttemptToSpeculativeTaskIndices
+            .getOrElseUpdate(stageAttempt, new mutable.HashSet[Int]).add(taskIndex)
+          stageAttemptToUnsubmittedSpeculativeTasks

Review Comment:
   Or whether `stageAttemptToUnsubmittedSpeculativeTasks.contains(stageAttempt)` should always be true at this time
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1039157936


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   > Interesting option, but I would not prefer it to be modifiable by users - given it is a developer api. One option would be to do something like:
   > 
   > ```
   > 
   > // modify json protocol, etc to 
   > private[spark] var _taskIndex: Int = -1
   > 
   > def taskIndex: Int = _taskIndex
   > ```
   > 
   > +CC @jiangxb1987 who had worked on this in the past. Thoughts @Ngone51
   
   I think this is ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1043028737


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Got it, I'll make a commit to try and fix this soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034404863


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -775,16 +779,14 @@ private[spark] class ExecutorAllocationManager(
           }
         }
         if (taskEnd.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
-          // If the previous task attempt succeeded first and it was the last task in a stage,
-          // the stage may have been removed before handing this speculative TaskEnd event.
-          if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
-            stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
-          }
+          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach(_.remove(taskIndex))
         }
 
         taskEnd.reason match {
-          case Success | _: TaskKilled =>
+          case Success =>
+            // remove speculative task for task finished.

Review Comment:
   Good idea!



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -810,9 +812,10 @@ private[spark] class ExecutorAllocationManager(
       val stageId = speculativeTask.stageId
       val stageAttemptId = speculativeTask.stageAttemptId
       val stageAttempt = StageAttempt(stageId, stageAttemptId)
+      val taskIndex = speculativeTask.taskIndex
       allocationManager.synchronized {
-        stageAttemptToNumSpeculativeTasks(stageAttempt) =
-          stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
+        stageAttemptToUnsubmittedSpeculativeTasks.getOrElseUpdate(stageAttempt,

Review Comment:
   Got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034271797


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   In some scenarios, `task.partitionId` is not the same as `taskIndex` ?.
   eg. when shuffleMapStage retry, taskSet only contains tasks resubmitted due to fetch failure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1323074280

   > How far back should this backport?
   
   I hope it can be back to  3.3+(3.3 included). For version 3.3 is mainly used in our production environment~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028821337


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-            new mutable.HashSet[Int]) += taskIndex
+          stageAttemptToSpeculativeTaskIndices

Review Comment:
   Got 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jiangxb1987 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
jiangxb1987 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1041586075


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   +1 on a spark private variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042129179


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Since `partitionId` is never used in this PR, is it only added for possible future use? 
   If so, I'd prefer to add this in another pr to solve problem given [SPARK-37831](https://issues.apache.org/jira/browse/SPARK-37831)~ @mridulm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1043015456


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   This is about keeping `SparkListenerSpeculativeTaskSubmitted`, which is a public api, consistent - not just from point of view of the specific bug being addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1041745719


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Since we are making an API change, given SPARK-37831, we should add `partitionId` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1360725361

   Since GA passed successfully, can anyone merge the pr~
   @mridulm @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1323065288

   How far back should this backport?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028811606


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -749,8 +749,10 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
-            new mutable.HashSet[Int]) += taskIndex
+          stageAttemptToSpeculativeTaskIndices
+            .getOrElseUpdate(stageAttempt, new mutable.HashSet[Int]).add(taskIndex)
+          stageAttemptToUnsubmittedSpeculativeTasks

Review Comment:
   1. How about `stageAttemptToUnsubmittedSpeculativeTasks .get(stageAttempt).foreach(_.remove(taskIndex))`.
   In this way, nothing will changed even if `stageAttemptToUnsubmittedSpecificativeTasks` not initialized.
   2. For `stageAttemptToUnsubmittedSpeculativeTasks` is thread-safe, `stageAttemptToUnsubmittedSpeculativeTasks.contains(stageAttempt)` would be correct anyway



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1037165963


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   If we need to ensure backward compatibility, can we try to change `SparkListenerSpeculativeTaskSubmitted` like `SparkListenerJobStart`?
   
   https://github.com/apache/spark/blob/3fc8a90267312fdfc42b7e6a16ee69f2507ef56b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L73-L83
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1035515024


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   For the specific purposes here, even though they are different, they should work the same ?
   My concern is similar to @Ngone51's comment below - we should minimize change to developer api - and given we can use `partitionId` instead, I would rely on that instead.
   (Do let me know if we cant !)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1036003754


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   I constructed a case with shuffle retry and logged `taskIndex` and `partitionId` recorded in task events `SparkListenerTaskStart` & `SparkListenerTaskEnd`.
   When stage submitted normally(Not a retry stage or more broadly, not a partial task submission), `taskIndex` is equal to `partitionId`:
   <img width="1284" alt="image" src="https://user-images.githubusercontent.com/20420642/204814905-59625694-2e37-4484-8b10-522af8f0d61e.png">
   But when shuffle stage resubmitted with partial tasks, `taskIndex` is not necessarily equal to `partitionId`:
   <img width="1278" alt="image" src="https://user-images.githubusercontent.com/20420642/204815345-eef0eada-62b3-4e7c-b51d-1e9748e2235c.png">
   
   So if we use `partitionId` instead of `taskIndex` for `SpeculativeTaskSubmitted `, we won't release pending speculative tasks which the normal task is finished before starting the speculative task  and mislead the calculation of target executors, for `taskIndex` may not equal to `partitionId`
   
   @mridulm Please check if there is any problem~ cc- @Ngone51 @LuciferYang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1037089750


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -722,9 +724,9 @@ private[spark] class ExecutorAllocationManager(
         // because the attempt may still have running tasks,
         // even after another attempt for the stage is submitted.
         stageAttemptToNumTasks -= stageAttempt
-        stageAttemptToNumSpeculativeTasks -= stageAttempt
-        stageAttemptToTaskIndices -= stageAttempt
         stageAttemptToSpeculativeTaskIndices -= stageAttempt
+        stageAttemptToPendingSpeculativeTasks -= stageAttempt
+        stageAttemptToTaskIndices -= stageAttempt

Review Comment:
   Seems this line just reorder?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1046674086


##########
project/MimaExcludes.scala:
##########
@@ -122,6 +122,13 @@ object MimaExcludes {
     // [SPARK-41072][SS] Add the error class STREAM_FAILED to StreamingQueryException
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
 
+
+    // [SPARK-41192] add taskIndex in SparkListenerSpeculativeTaskSubmitted Event
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.apply"),

Review Comment:
   Do we still need to add these mima filters?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm closed pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm closed pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic
URL: https://github.com/apache/spark/pull/38711


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1044060202


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   Don‘t you want to propagate `partitionId` as well since you've added it to `SpeculativeTaskSubmitted`.



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   Don‘t you want to propagate `partitionId` as well since you've added it to `SpeculativeTaskSubmitted`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042993786


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Is it a lot of change to add support for it ?
   I would want to keep both of them in sync, and prevent divergence - given the issues we have already faced with that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1028783234


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -774,17 +776,16 @@ private[spark] class ExecutorAllocationManager(
             removeStageFromResourceProfileIfUnused(stageAttempt)
           }
         }
+

Review Comment:
   Please remove this empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1039139958


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Interesting option, but I would not prefer it to be modifiable by users - given it is a developer api.
   One option would be to do something like:
   ```
   
   // modify json protocol, etc to 
   private[spark] var _taskIndex: Int = -1
   
   def taskIndex: Int = _taskIndex
   
   ```
   
   +CC @jiangxb1987 who had worked on this in the past.
   Thoughts @Ngone51 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1039139958


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Interesting option, but I would not prefer it to be modifiable by users - given it is a developer api.
   One option would be to do something like:
   ```
   
   // modify json protocol, etc to 
   private[spark] var _taskIndex: Int = -1
   
   def taskIndex  = _taskIndex
   
   ```
   
   +CC @jiangxb1987 who had worked on this in the past.
   Thoughts @Ngone51 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1038740740


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))

Review Comment:
   We are trying to identify if two tasks for a stage attempt are computing the same partition or not.
   Whether this is using partitionId or taskIndex, they are effectively the same - yes, the values wont match when a subset of partitions are computed in a stage attempt - but that is expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1040411406


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-41192): We simply need an Int for this.
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()

Review Comment:
   Line is longer than 100 characters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1042997408


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   `Is it a lot of change to add support for it ?`
   If you just want to add `partitionId` as a field of taskEvent, the change will not be too much.
   `I would want to keep both of them in sync, and prevent divergence`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1041745719


##########
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##########
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Since we are making an API change, given SPARK-37831, we should add `partitionId` as well.
   We can use either of them in `ExecutorAllocationManager`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1044062231


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1178,8 +1178,13 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
-    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
+    val speculativeTaskSubmittedEvent =
+      SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)
+    // add taskIndex field for Executor Dynamic Allocation
+    speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
+    speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
+    listenerBus.post(speculativeTaskSubmittedEvent)

Review Comment:
   How about adding a new apply() to `SparkListenerSpeculativeTaskSubmitted` to simplify the construction? e.g.,
   ```scala
   object SparkListenerSpeculativeTaskSubmitted {
     def apply(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: Int)
       : SparkListenerSpeculativeTaskSubmitted = {
       val speculativeTaskSubmittedEvent =
         SparkListenerSpeculativeTaskSubmitted(stageId, stageAttemptId)
       // add taskIndex field for Executor Dynamic Allocation
       speculativeTaskSubmittedEvent.updateTaskIndex(taskIndex)
       speculativeTaskSubmittedEvent.updatePartitionId(task.partitionId)
       speculativeTaskSubmittedEvent
     }
   ```
   so here we can construct the `SparkListenerSpeculativeTaskSubmitted` like `      SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId, taskIndex, task.partitionId)
   ` and I think it'd be less error-prone in case of users forget to call updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1360815601

   Thanks All !!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] toujours33 commented on pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Posted by GitBox <gi...@apache.org>.
toujours33 commented on PR #38711:
URL: https://github.com/apache/spark/pull/38711#issuecomment-1354264979

   My pleasure & Thank you all for reviewing~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org