You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yazhi Wang (Jira)" <ji...@apache.org> on 2022/11/18 04:14:00 UTC

[jira] [Created] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors

Yazhi Wang created SPARK-41192:
----------------------------------

             Summary: Task finished before speculative task scheduled leads to holding idle executors
                 Key: SPARK-41192
                 URL: https://issues.apache.org/jira/browse/SPARK-41192
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.3.1, 3.2.2
            Reporter: Yazhi Wang


When task finished before speculative task has been scheduled by DAGScheduler, then the speculative tasks will be considered as pending and count towards the calculation of number of needed executors, which will lead to request more executors than needed
h2. Background & Reproduce

In one of our production job, we found that ExecutorAllocationManager was holding more executors than needed. 

We found it's difficult to reproduce in the test environment. In order to stably reproduce and debug, we temporarily annotated the scheduling code of speculative tasks in TaskSetManager:363 to ensure that the task be completed before the speculative task being scheduled.
{code:java}
// Original code
private def dequeueTask(
    execId: String,
    host: String,
    maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
  // Tries to schedule a regular task first; if it returns None, then schedules
  // a speculative task
  dequeueTaskHelper(execId, host, maxLocality, false).orElse(
    dequeueTaskHelper(execId, host, maxLocality, true))
} 
// Speculative task will never be scheduled
private def dequeueTask(
    execId: String,
    host: String,
    maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
  // Tries to schedule a regular task first; if it returns None, then schedules
  // a speculative task
  dequeueTaskHelper(execId, host, maxLocality, false)
}  {code}
Referring to examples in SPARK-30511

You will see when running the last task, we would be hold 38 executors (see attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
{code:java}
./bin/spark-shell --master yarn --conf spark.speculation=true --conf spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=20 --conf spark.dynamicAllocation.maxExecutors=1000 {code}
{code:java}
val n = 4000
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index > 3998) {
    Thread.sleep(1000 * 1000)
} else if (index > 3850) {
    Thread.sleep(20 * 1000) // Fake running tasks
} else {
    Thread.sleep(100)
}
Array.fill[Int](1)(1).iterator{code}
 

I will have a PR ready to fix this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org