You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:21:16 UTC

[jira] [Updated] (SPARK-15385) Jobs never complete for ClusterManagers that don't implement killTask

     [ https://issues.apache.org/jira/browse/SPARK-15385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-15385:
---------------------------------
    Labels: bulk-closed  (was: )

> Jobs never complete for ClusterManagers that don't implement killTask
> ---------------------------------------------------------------------
>
>                 Key: SPARK-15385
>                 URL: https://issues.apache.org/jira/browse/SPARK-15385
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.6.1
>            Reporter: Imran Rashid
>            Priority: Minor
>              Labels: bulk-closed
>
> If a {{SchedulerBackend}} doesn't implement {{killTask}}, then when a job fails, if there are any tasks still running, the job never completes.
> This would be a major bug, except that all of the existing SchedulerBackend's implement {{killTask}}, so in practice this doesn't effect anyone.  It does complicate the scheduler code, though, so it would be nice to simplify this one way or the other.
> The problem stems from [this code | https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1473 ] in {{DAGScheduler}}:
> {code}
>          if (runningStages.contains(stage)) {
>             try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
>               taskScheduler.cancelTasks(stageId, shouldInterruptThread)
>               markStageAsFinished(stage, Some(failureReason))
>             } catch {
>               case e: UnsupportedOperationException =>
>                 logInfo(s"Could not cancel tasks for stage $stageId", e)
>               ableToCancelStages = false
>             }
>           }
>         }
>       }
>     }
>     if (ableToCancelStages) {
>       job.listener.jobFailed(error)
>       cleanupStateForJobAndIndependentStages(job)
>       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
>     }
> {code}
> it *appears* to handle the case where a backend doesn't support {{killTask}}, but in fact if the backend doesn't support it, then we never end up doing any of the actions for the failed job, even after all the remaining running tasks come back.
> One fix would be to wait till we got the task finished events from the backend, and then send all the job failed events.  But (a) this is a bit complex and (b) you never know you'll get all those task end events, eg. if there is some issue on the executor where those remaining tasks are running.
> The much simpler option: just run all the job completion logic whether or not those tasks have been cancelled.  Even when the backend *does* support killing tasks, its async, and so all the job failed msgs get sent before we know the tasks have been killed.  I don't see how its any worse in the case where the backend doesn't kill tasks at all.  If others are OK with that, I'd propose that we make {{killTask}} an abstract method in {{SchedulerBackend}}, to force backends to think about implementing it, but add a comment that implementations are free to make it a no-op.
> You can reproduce with this test (which leverages an open pr for SPARK-10372):
> {code}
> class BackendWithoutKillTaskIntegrationSuite
>     extends SchedulerIntegrationSuite[NoKillMultiExecutorBackend] {
>   testScheduler("job failure after 4 attempts when killTask is unsupported") {
>     def runBackend(): Unit = {
>       val task = backend.beginTask()
>       val failure = new ExceptionFailure(new RuntimeException("test task failure"), Seq())
>       backend.taskFailed(task, TaskState.FAILED, failure)
>     }
>     withBackend(runBackend _) {
>       val nParts = backend.defaultParallelism()
>       val jobFuture = submit(new MockRDD(sc, nParts, Nil), (0 until nParts).toArray)
>       val duration = Duration(10, SECONDS)
>       Await.ready(jobFuture, duration)
>       failure.getMessage.contains("test task failure")
>     }
>     assert(results.isEmpty)
>     assertDataStructuresEmpty(noFailure = false)
>   }
> }
> class NoKillMultiExecutorBackend(
>     conf: SparkConf,
>     taskScheduler: TaskSchedulerImpl) extends MultiExecutorMockBackend(conf, taskScheduler) {
>   override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
>     throw new UnsupportedOperationException()
>   }
> }
> {code}
> in the logs you'll see something like:
> {noformat}
> 16/05/18 11:30:21 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 0.0 (TID 46) on executor host-1: java.lang.RuntimeException (test task failure) [duplicate 9]
> 16/05/18 11:30:21 INFO scheduler.DAGScheduler: Could not cancel tasks for stage 0
> java.lang.UnsupportedOperationException
>         at org.apache.spark.scheduler.NoKillMultiExecutorBackend.killTask(SchedulerIntegrationSuite.scala:550)
>         at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:219)
>         at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:217)
> ...
> <more task completion msgs from all the other tasks that are running>
> ...
> 16/05/18 11:30:21 INFO scheduler.SchedulerIntegrationSuite$$anon$1: Removed TaskSet 0.0, whose tasks have all completed, from pool
> 16/05/18 11:30:30 INFO scheduler.BackendWithoutKillTaskIntegrationSuite:
> ===== FINISHED o.a.s.scheduler.BackendWithoutKillTaskIntegrationSuite: 'job failure after 4 attempts when killTask is unsupported' =====
> [info] - job failure after 4 attempts when killTask is unsupported *** FAILED *** (10 seconds, 995 milliseconds)
> [info]   java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
> ...
> {noformat}
> Note that the taskset is removed after those remaining tasks complete, but despite waiting a whopping 10 seconds our job never realizes its done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org