You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tejas Patil (JIRA)" <ji...@apache.org> on 2017/01/22 03:46:26 UTC

[jira] [Created] (SPARK-19326) Speculated task attempts do not get launched in few scenarios

Tejas Patil created SPARK-19326:
-----------------------------------

             Summary: Speculated task attempts do not get launched in few scenarios
                 Key: SPARK-19326
                 URL: https://issues.apache.org/jira/browse/SPARK-19326
             Project: Spark
          Issue Type: Bug
          Components: Scheduler
    Affects Versions: 2.1.0, 2.0.2
            Reporter: Tejas Patil


Speculated copies of tasks do not get launched in some cases.

Examples:
- All the running executors have no CPU slots left to accommodate a speculated copy of the task(s). If the all running executors reside over a set of slow / bad hosts, they will keep the job running for long time
- `spark.task.cpus` > 1 and the running executor has not filled up all its CPU slots. Since the [speculated copies of tasks should run on different host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283] and not the host where the first copy was launched.

In both these cases, `ExecutorAllocationManager` does not know about pending speculation task attempts and thinks that all the resource demands are well taken care of. ([relevant code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])

This adds variation in the job completion times and more importantly SLA misses :( In prod, with a large number of jobs, I see this happening more often than one would think. Chasing the bad hosts or reason for slowness doesn't scale.

Here is a tiny repro. Note that you need to launch this with (Mesos or YARN or standalone deploy mode) along with `spark.speculation=true`

{code}
val someRDD = sc.parallelize(1 to 8, 8)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 8) {
  Thread.sleep(Long.MaxValue)  // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org