You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mridul Muralidharan (Jira)" <ji...@apache.org> on 2022/12/21 03:37:00 UTC

[jira] [Assigned] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors

     [ https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mridul Muralidharan reassigned SPARK-41192:
-------------------------------------------

    Assignee: Yazhi Wang

> Task finished before speculative task scheduled leads to holding idle executors
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-41192
>                 URL: https://issues.apache.org/jira/browse/SPARK-41192
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.2, 3.3.1
>            Reporter: Yazhi Wang
>            Assignee: Yazhi Wang
>            Priority: Minor
>              Labels: dynamic_allocation
>         Attachments: dynamic-executors, dynamic-log
>
>
> When task finished before speculative task has been scheduled by DAGScheduler, then the speculative tasks will be considered as pending and count towards the calculation of number of needed executors, which will lead to request more executors than needed
> h2. Background & Reproduce
> In one of our production job, we found that ExecutorAllocationManager was holding more executors than needed. 
> We found it's difficult to reproduce in the test environment. In order to stably reproduce and debug, we temporarily annotated the scheduling code of speculative tasks in TaskSetManager:363 to ensure that the task be completed before the speculative task being scheduled.
> {code:java}
> // Original code
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false).orElse(
>     dequeueTaskHelper(execId, host, maxLocality, true))
> } 
> // Speculative task will never be scheduled
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false)
> }  {code}
> Referring to examples in SPARK-30511
> You will see when running the last task, we would be hold 38 executors (see attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
> {code:java}
> ./bin/spark-shell --master yarn --conf spark.speculation=true --conf spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=20 --conf spark.dynamicAllocation.maxExecutors=1000 {code}
> {code:java}
> val n = 4000
> val someRDD = sc.parallelize(1 to n, n)
> someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
> if (index > 3998) {
>     Thread.sleep(1000 * 1000)
> } else if (index > 3850) {
>     Thread.sleep(50 * 1000) // Fake running tasks
> } else {
>     Thread.sleep(100)
> }
> Array.fill[Int](1)(1).iterator{code}
>  
> I will have a PR ready to fix this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org