You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by markhamstra <gi...@git.apache.org> on 2014/05/08 01:02:37 UTC

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

GitHub user markhamstra opened a pull request:

    https://github.com/apache/spark/pull/686

    [SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask

    It turns out that having the DAGScheduler tell the taskScheduler to cancelTasks when the backend does not implement killTask (e.g. Mesos) is not such a good idea.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/markhamstra/spark SPARK-1749

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/686.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #686
    
----
commit 57fe87e75912cc9cd7ae9e5a0ed3f4236d46b80f
Author: Mark Hamstra <ma...@gmail.com>
Date:   2014-05-07T22:56:27Z

    Catch UnsupportedOperationException when DAGScheduler tries to
    cancel a job on a SchedulerBackend that does not implement killTask

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14037007
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1062,10 +1062,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    +              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            }
    --- End diff --
    
    Sorry Mark one more question here -- can we move the job.listener.jobFailed(error) call from line 1041 to here in the "try" clause?  It seems weird to tell the user the job has been cancelled when, in fact, it hasn't.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-48011319
  
    @markhamstra mind closing this? It got merged through Kay's PR/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14031997
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -313,6 +314,47 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
         assertDataStructuresEmpty
       }
     
    +  test("job cancellation no-kill backend") {
    +    // make sure that the DAGScheduler doesn't crash when the TaskScheduler
    +    // doesn't implement killTask()
    +    val noKillTaskScheduler = new TaskScheduler() {
    +      override def rootPool: Pool = null
    +      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
    +      override def start() = {}
    +      override def stop() = {}
    +      override def submitTasks(taskSet: TaskSet) = {
    +        // normally done by TaskSetManager
    +        taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
    --- End diff --
    
    are these lines necessary (can you just do nothing here?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12408454
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1148,7 +1154,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
           case x: Exception =>
             logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
               .format(x.getMessage))
    -        dagScheduler.doCancelAllJobs()
    +        try {
    +          dagScheduler.doCancelAllJobs()
    +        } catch {
    +          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    --- End diff --
    
    Oh oh I see I didn't realize that this was part of the shut down code; this makes sense now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42872809
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12498098
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1148,7 +1154,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
           case x: Exception =>
             logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
               .format(x.getMessage))
    -        dagScheduler.doCancelAllJobs()
    +        try {
    +          dagScheduler.doCancelAllJobs()
    +        } catch {
    +          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    --- End diff --
    
    @aarondav So shall I just back off to `case e: Exception =>` here and let Throwable be picked up in a larger refactoring of Akka exception handling?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-46709007
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12497895
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1148,7 +1154,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
           case x: Exception =>
             logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
               .format(x.getMessage))
    -        dagScheduler.doCancelAllJobs()
    +        try {
    +          dagScheduler.doCancelAllJobs()
    +        } catch {
    +          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    --- End diff --
    
    https://github.com/apache/spark/pull/715?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42495440
  
    The INFO log should include the information that tasks were not cancelled.  Where/how else do you want to see notification of those facts?  Is adding more Listener events something we want to contemplate still in 1.0.0, or should something like that go into 1.1? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-44346543
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42494319
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14790/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12409225
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1055,10 +1055,16 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            } finally {
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    --- End diff --
    
    Good question.  I'm mostly just trying to keep the DAGScheduler in a consistent state even when the backend doesn't support killing tasks, and I'll admit to working quickly while trying to get this significant bug fix into 1.0.0, not having fully thought this part through.  If you can't see any use for the finally block unless taskScheduler.cancelTasks is successful, then we can drop the finally. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42494208
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-44343282
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14041296
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1062,10 +1062,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    +              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            }
    --- End diff --
    
    What you suggest could be done, but there's a question of whether or not notification of cancellation of the job should be made regardless of whether any stages and task are successfully cancelled as a consequence.  I don't really know how to answer that because I don't  know how all of the listeners are using the notification or whether they are all expecting the same semantics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-45113730
  
    @rxin merge to 1.0.1 and 1.1.0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14041515
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1062,10 +1062,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    +              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            }
    --- End diff --
    
    @pwendell what do you think here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42868032
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-44343268
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42497440
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14791/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-44346546
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15234/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14040631
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1062,10 +1062,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    +              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            }
    --- End diff --
    
    Hmmm... not sure that I agree.  A job being cancelled, stages being cancelled, and tasks being cancelled are all different things.  The expectation is that job cancellation will lead to cancellation of independent stages and their associated tasks; but if no stages and tasks get cancelled, it's probably still worthwhile for the information to be sent that the job itself was cancelled.  I expect that eventually all of the backends will support task killing, so this whole no-kill path should never be hit.  But moving the job cancellation notification within the try-to-cancelTasks block will result in multiple notifications that the parent job was cancelled -- one for each independent stage cancellation.  Or am I misunderstanding what you are suggesting?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-46713417
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15960/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42495154
  
    So it looks like this commit fixes the crash issue (would be nice to note this in the test -- that the point of the test is to make sure that Spark doesn't crash).  But it would also be good to get this message back to the user -- since right now if the user tries to cancel her job, Spark will appear to have successfully cancelled the job when, in fact, it has not been cancelled.  How hard is it to do this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12408196
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1055,10 +1055,16 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            } finally {
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    --- End diff --
    
    Why do this part even when the SchedulerBackend doesn't support cancellation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-47157319
  
    @markhamstra I'd like to pull this into a 1.0.1 RC that's going out today. This looks great, I just had a small comment. Are you around to address it? If not I can just make the change on merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14211667
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1062,10 +1062,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    +              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            }
    --- End diff --
    
    Ok, either tonight or tomorrow I can update this PR to reflect that strategy, or you can go ahead and make the change @pwendell.  Outside the immediate scope of this PR, what prevents Mesos from being able to kill tasks, and when do we expect that to change? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42872810
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14899/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14213344
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1062,10 +1062,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    +              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            }
    --- End diff --
    
    I'll just take care of it... thanks mark!
    
    Task killing is not supported in the fine-grained mode on mesos because, in that mode, we use Mesos's built in support for all of the control plane messages relating to tasks. So we'll have to figure out how to support killing tasks in that model. There are two questions, one is who actually sends the "kill" message to the executor and the other is how we tell Mesos that the cores are freed which were in use by the task. In the course of normal operation that's handled by using the Mesos launchTask and sendStatusUpdate interfaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12497829
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1148,7 +1154,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
           case x: Exception =>
             logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
               .format(x.getMessage))
    -        dagScheduler.doCancelAllJobs()
    +        try {
    +          dagScheduler.doCancelAllJobs()
    +        } catch {
    +          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    --- End diff --
    
    should probably not catch `Throwable` here (#712 is working towards reverting all of these)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42494318
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-46706691
  
    Just a small comment on the tests but other than that this looks good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12408395
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1148,7 +1154,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
           case x: Exception =>
             logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
               .format(x.getMessage))
    -        dagScheduler.doCancelAllJobs()
    +        try {
    +          dagScheduler.doCancelAllJobs()
    +        } catch {
    +          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    --- End diff --
    
    Who knows just what the SchedulerBackend is going to throw now or in the future?  UnsupportedOperationException is handled in failJobAndIndependentStages, but if something else is thrown out of the backend or doCancelAllJobs fails for any other reason, we'll just log it here and continue trying to shutdown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42494920
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-46713416
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra closed the pull request at:

    https://github.com/apache/spark/pull/686


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-46709020
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14041700
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1062,10 +1062,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    +              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            }
    --- End diff --
    
    Do we have the meaning of all the listener events fully documented someplace?  Or perhaps that needs to be done in a separate PR and then DAGScheduler updated to match the documented expectation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12498290
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1148,7 +1154,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
           case x: Exception =>
             logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
               .format(x.getMessage))
    -        dagScheduler.doCancelAllJobs()
    +        try {
    +          dagScheduler.doCancelAllJobs()
    +        } catch {
    +          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    --- End diff --
    
    Alas, I'll have to go with @markhamstra's original solution (catching Throwable) on this one. I didn't include Akka-related code in my refactoring for precisely the reason that we don't have a great solution for it right now. We can factor it back to Exception in a later cleanup when such a solution appears.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r13106282
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1055,10 +1055,16 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            } finally {
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    --- End diff --
    
    Upon further review, I can't see any use (other than misleading the user) for posting stage cancellation to the SparkListenerBus when the SchedulerBackend does not support cancellation, so we won't do that.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42497438
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42494931
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-46684950
  
    ping: This should go into 1.0.1
    @pwendell


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42495867
  
    what will happen to the to-be-cancelled tasks in Mesos when the user wants to cancel them?  still running there? it seems that https://github.com/apache/spark/pull/686#discussion-diff-12408196R1065 is marking the stage has been cancelled and actually the tasks are still there? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42494197
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42496521
  
    If interruptThread is not `true`, then we are going to leave tasks running on the cluster after cancellation with other backends as well.  This is definitely an issue begging for further work, but I don't think we can go any further than https://github.com/apache/spark/pull/498 right now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14211376
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1062,10 +1062,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    +              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            }
    --- End diff --
    
    I think the correct semantics here are that we only trigger `jobFailed` if the job was succesfully halted. This is mostly consumed downstream by tests, but it's also used by code supporting asynchronous actions and approximate results. In those cases, it would be better not to notify `jobFailed` if the cancellation doesn't succeed, because they both assume that the hob has finished executing if that message is received.
    
    Separately we should probably update the documentation in cancel to explain that it is a "best effort" method and will only be called if supported by the underlying scheduler. Otherwise, we should say it will act as a no-op, i.e. it will act is if `cancel` was never called. With this approach downstream consumers will only have two cases to worry about (a job was cancelled or it wasn't) rather than a third case, where we say it was cancelled but it secretely actually wasn't.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42494200
  
    @CodingCat @kayousterhout @rxin


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14032954
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -313,6 +314,47 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
         assertDataStructuresEmpty
       }
     
    +  test("job cancellation no-kill backend") {
    +    // make sure that the DAGScheduler doesn't crash when the TaskScheduler
    +    // doesn't implement killTask()
    +    val noKillTaskScheduler = new TaskScheduler() {
    +      override def rootPool: Pool = null
    +      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
    +      override def start() = {}
    +      override def stop() = {}
    +      override def submitTasks(taskSet: TaskSet) = {
    +        // normally done by TaskSetManager
    +        taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
    --- End diff --
    
    Sure, doing nothing is easy.
    
    
    On Fri, Jun 20, 2014 at 10:47 AM, Kay Ousterhout <no...@github.com>
    wrote:
    
    > In core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
    >
    > > @@ -313,6 +314,47 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
    > >      assertDataStructuresEmpty
    > >    }
    > >
    > > +  test("job cancellation no-kill backend") {
    > > +    // make sure that the DAGScheduler doesn't crash when the TaskScheduler
    > > +    // doesn't implement killTask()
    > > +    val noKillTaskScheduler = new TaskScheduler() {
    > > +      override def rootPool: Pool = null
    > > +      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
    > > +      override def start() = {}
    > > +      override def stop() = {}
    > > +      override def submitTasks(taskSet: TaskSet) = {
    > > +        // normally done by TaskSetManager
    > > +        taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
    >
    > are these lines necessary (can you just do nothing here?)
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/686/files#r14031997>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/686#issuecomment-42868016
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r14040933
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1062,10 +1062,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +              val stageInfo = stageToInfos(stage)
    +              stageInfo.stageFailed(failureReason)
    +              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            } catch {
    +              case e: UnsupportedOperationException =>
    +                logInfo(s"Could not cancel tasks for stage $stageId", e)
    +            }
    --- End diff --
    
    Ah you're right that it doesn't make sense to add that here because it will be called for each stage.  My intention was that if the job has running stages that don't get cancelled (because the task scheduler doesn't implement cancelTasks()), then we should not call job.listener.jobFailed() -- do you think that makes sense?  Seems like the way to implement that would be to set a boolean flag here if the job can't be successfully cancelled, and then call jobFailed() 0 or 1 times at the end of this function depending on that flag.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12408178
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1055,10 +1055,15 @@ class DAGScheduler(
               // This is the only job that uses this stage, so fail the stage if it is running.
               val stage = stageIdToStage(stageId)
               if (runningStages.contains(stage)) {
    -            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    -            val stageInfo = stageToInfos(stage)
    -            stageInfo.stageFailed(failureReason)
    -            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
    +            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
    +              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
    +            } catch {
    +              case e: UnsupportedOperationException => logInfo(s"Could not cancel tasks for stage $stageId", e)
    --- End diff --
    
    Can you change this message to explain why: that the scheduler used (and then print what scheduler is being used?) doesn't support cancellation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/686#discussion_r12408229
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1148,7 +1154,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
           case x: Exception =>
             logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
               .format(x.getMessage))
    -        dagScheduler.doCancelAllJobs()
    +        try {
    +          dagScheduler.doCancelAllJobs()
    +        } catch {
    +          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    --- End diff --
    
    What throwable do you get here?  It looks like the UnsupportedOperationException is caught in DAGScheduler and not re-thrown?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---