You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Thomas Graves (Jira)" <ji...@apache.org> on 2020/01/31 14:52:00 UTC
[jira] [Assigned] (SPARK-30511) Spark marks intentionally killed
speculative tasks as pending leads to holding idle executors
[ https://issues.apache.org/jira/browse/SPARK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Thomas Graves reassigned SPARK-30511:
-------------------------------------
Assignee: Zebing Lin
> Spark marks intentionally killed speculative tasks as pending leads to holding idle executors
> ---------------------------------------------------------------------------------------------
>
> Key: SPARK-30511
> URL: https://issues.apache.org/jira/browse/SPARK-30511
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 2.3.0
> Reporter: Zebing Lin
> Assignee: Zebing Lin
> Priority: Major
> Fix For: 3.0.0
>
> Attachments: Screen Shot 2020-01-15 at 11.13.17.png
>
>
> *TL;DR*
> When speculative tasks fail/get killed, they are still considered as pending and count towards the calculation of number of needed executors.
> h3. Symptom
> In one of our production job (where it's running 4 tasks per executor), we found that it was holding 6 executors at the end with only 2 tasks running (1 speculative). With more logging enabled, we found the job printed:
> {code:java}
> pendingTasks is 0 pendingSpeculativeTasks is 17 totalRunningTasks is 2
> {code}
> while the job only had 1 speculative task running and 16 speculative tasks intentionally killed because of corresponding original tasks had finished.
> An easy repro of the issue (`--conf spark.speculation=true --conf spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=1000` in cluster mode):
> {code:java}
> val n = 4000
> val someRDD = sc.parallelize(1 to n, n)
> someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
> if (index < 300 && index >= 150) {
> Thread.sleep(index * 1000) // Fake running tasks
> } else if (index == 300) {
> Thread.sleep(1000 * 1000) // Fake long running tasks
> }
> it.toList.map(x => index + ", " + x).iterator
> }).collect
> {code}
> You will see when running the last task, we would be hold 38 executors (see attachment), which is exactly (152 + 3) / 4 = 38.
> h3. The Bug
> Upon examining the code of _pendingSpeculativeTasks_:
> {code:java}
> stageAttemptToNumSpeculativeTasks.map { case (stageAttempt, numTasks) =>
> numTasks - stageAttemptToSpeculativeTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
> }.sum
> {code}
> where _stageAttemptToNumSpeculativeTasks(stageAttempt)_ is incremented on _onSpeculativeTaskSubmitted_, but never decremented. _stageAttemptToNumSpeculativeTasks -= stageAttempt_ is performed on stage completion. *This means Spark is marking ended speculative tasks as pending, which leads to Spark to hold more executors that it actually needs!*
> I will have a PR ready to fix this issue, along with SPARK-28403 too
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org