You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/29 07:32:02 UTC

[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034375202


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -810,9 +812,10 @@ private[spark] class ExecutorAllocationManager(
       val stageId = speculativeTask.stageId
       val stageAttemptId = speculativeTask.stageAttemptId
       val stageAttempt = StageAttempt(stageId, stageAttemptId)
+      val taskIndex = speculativeTask.taskIndex
       allocationManager.synchronized {
-        stageAttemptToNumSpeculativeTasks(stageAttempt) =
-          stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
+        stageAttemptToUnsubmittedSpeculativeTasks.getOrElseUpdate(stageAttempt,

Review Comment:
   "Unsubmitted" here is confusing since the method name here is `onSpeculativeTaskSubmitted`.. how about `stageAttemptToPendingSpeculativeTasks`?



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -775,16 +779,14 @@ private[spark] class ExecutorAllocationManager(
           }
         }
         if (taskEnd.taskInfo.speculative) {
-          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
-          // If the previous task attempt succeeded first and it was the last task in a stage,
-          // the stage may have been removed before handing this speculative TaskEnd event.
-          if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
-            stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
-          }
+          stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach(_.remove(taskIndex))
         }
 
         taskEnd.reason match {
-          case Success | _: TaskKilled =>
+          case Success =>
+            // remove speculative task for task finished.

Review Comment:
   ```suggestion
               // Remove pending speculative task in case the normal task is finished before starting the speculative task
   ```



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-14492): We simply need an Int for this.

Review Comment:
   SPARK-41192?



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Number of speculative tasks running in each stageAttempt
+    // TODO(SPARK-14492): We simply need an Int for this.
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+    // Number of speculative tasks pending in each stageAttempt
+    private val stageAttemptToUnsubmittedSpeculativeTasks =

Review Comment:
   How about renaming to `stageAttemptToPendingSpeculativeTasks` and renaming `stageAttemptToSpeculativeTaskIndices` to `stageAttemptToRunningSpeculativeTaskIndices`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org