You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/29 07:32:02 UTC
[GitHub] [spark] Ngone51 commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic
Ngone51 commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1034375202
##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -810,9 +812,10 @@ private[spark] class ExecutorAllocationManager(
val stageId = speculativeTask.stageId
val stageAttemptId = speculativeTask.stageAttemptId
val stageAttempt = StageAttempt(stageId, stageAttemptId)
+ val taskIndex = speculativeTask.taskIndex
allocationManager.synchronized {
- stageAttemptToNumSpeculativeTasks(stageAttempt) =
- stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
+ stageAttemptToUnsubmittedSpeculativeTasks.getOrElseUpdate(stageAttempt,
Review Comment:
"Unsubmitted" here is confusing since the method name here is `onSpeculativeTaskSubmitted`.. how about `stageAttemptToPendingSpeculativeTasks`?
##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -775,16 +779,14 @@ private[spark] class ExecutorAllocationManager(
}
}
if (taskEnd.taskInfo.speculative) {
- stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
- // If the previous task attempt succeeded first and it was the last task in a stage,
- // the stage may have been removed before handing this speculative TaskEnd event.
- if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
- stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
- }
+ stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach(_.remove(taskIndex))
}
taskEnd.reason match {
- case Success | _: TaskKilled =>
+ case Success =>
+ // remove speculative task for task finished.
Review Comment:
```suggestion
// Remove pending speculative task in case the normal task is finished before starting the speculative task
```
##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
// Should be 0 when no stages are active.
private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
- // Number of speculative tasks pending/running in each stageAttempt
- private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
- // The speculative tasks started in each stageAttempt
+ // Number of speculative tasks running in each stageAttempt
+ // TODO(SPARK-14492): We simply need an Int for this.
Review Comment:
SPARK-41192?
##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
// Should be 0 when no stages are active.
private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
- // Number of speculative tasks pending/running in each stageAttempt
- private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
- // The speculative tasks started in each stageAttempt
+ // Number of speculative tasks running in each stageAttempt
+ // TODO(SPARK-14492): We simply need an Int for this.
private val stageAttemptToSpeculativeTaskIndices =
+ new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+ // Number of speculative tasks pending in each stageAttempt
+ private val stageAttemptToUnsubmittedSpeculativeTasks =
Review Comment:
How about renaming to `stageAttemptToPendingSpeculativeTasks` and renaming `stageAttemptToSpeculativeTaskIndices` to `stageAttemptToRunningSpeculativeTaskIndices`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org