You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/15 09:20:51 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1049388491


##########
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala:
##########
@@ -671,6 +681,83 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     onExecutorRemoved(manager, "5")
   }
 
+  test("SPARK-41192: remove executors when task finished before speculative task scheduled") {
+    val clock = new ManualClock()
+    val stage = createStageInfo(0, 40)
+    val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4)
+    val manager = createManager(conf, clock = clock)
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    // submit 40 tasks, total executors needed = 40/4 = 10
+    post(SparkListenerStageSubmitted(stage))
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 3)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+
+    (0 to 9).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
+    (0 to 39).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {

Review Comment:
   super nit: `0 until 10`, `0 until 40`, etc in this method - so that the bound matches what we have configured (without needing to correlate with `end - 1` : help minimize errors in future as code evolves).



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1): Unit = {

Review Comment:
   We dont need the default value.
   
   ```suggestion
     def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org