You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/02 15:37:00 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #28619: [SPARK-21040][CORE] Speculate tasks which are running on decommission executors

Ngone51 commented on a change in pull request #28619:
URL: https://github.com/apache/spark/pull/28619#discussion_r433964453



##########
File path: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,112 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is decommissioned" +
+    " and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6*1000) // time = 6s
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4*1000) // time = 10s
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for
+    // > 15s for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt

Review comment:
       nit: redundant whitespace

##########
File path: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,112 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is decommissioned" +
+    " and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6*1000) // time = 6s
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4*1000) // time = 10s
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for
+    // > 15s for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK2 -> 15, TASK3 -> 15)
+    manager.executorDecommission("exec2")
+    assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3))
+    assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000)
+    assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000)
+
+    assert(manager.checkSpeculatableTasks(0))
+    // Task2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s)
+    // Task3 started at t=6s, so it might not finish before t=15s. So Task 3 should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Nothing should get scheduled now.
+    assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+    assert(taskOption3New.isDefined)
+
+    // Assert info about the newly launched speculative task
+    val speculativeTask3 = taskOption3New.get
+    assert(speculativeTask3.index === 3)
+    assert(speculativeTask3.taskId === 4)
+    assert(speculativeTask3.executorId === "exec3")
+    assert(speculativeTask3.attemptNumber === 1)
+
+    clock.advance(1*1000) // time = 11s
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+
+    clock.advance(5*1000) // time = 16s
+    // At t=16s, Task 2 has been running for 16s. It is more than the
+    // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now Task 2 will
+    // be selected for speculation. Here we are verifying that regular speculation configs
+    // should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+    // corresponding executor is decommissioned
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+    val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    assert(taskOption2New.isDefined)
+    val speculativeTask2 = taskOption2New.get
+    // Ensure that task index 2 is launched on exec3, host3

Review comment:
       Can we unify the description for the task? As far as I see, we have `TASK 2`, `TASK2`, `Task2`, `task index 2` in the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org