You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/06/26 05:57:52 UTC

git commit: [SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask

Repository: spark
Updated Branches:
  refs/heads/master 7f196b009 -> b88a59a66


[SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask

This is a fixed up version of #686 (cc @markhamstra @pwendell).  The last commit (the only one I authored) reflects the changes I made from Mark's original patch.

Author: Mark Hamstra <ma...@gmail.com>
Author: Kay Ousterhout <ka...@gmail.com>

Closes #1219 from kayousterhout/mark-SPARK-1749 and squashes the following commits:

42dfa7e [Kay Ousterhout] Got rid of terrible double-negative name
80b3205 [Kay Ousterhout] Don't notify listeners of job failure if it wasn't successfully cancelled.
d156d33 [Mark Hamstra] Do nothing in no-kill submitTasks
9312baa [Mark Hamstra] code review update
cc353c8 [Mark Hamstra] scalastyle
e61f7f8 [Mark Hamstra] Catch UnsupportedOperationException when DAGScheduler tries to cancel a job on a SchedulerBackend that does not implement killTask


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b88a59a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b88a59a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b88a59a6

Branch: refs/heads/master
Commit: b88a59a66845b8935b22f06fc96d16841ed20c94
Parents: 7f196b0
Author: Mark Hamstra <ma...@gmail.com>
Authored: Wed Jun 25 20:57:48 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Jun 25 20:57:48 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 30 ++++++++----
 .../spark/scheduler/DAGSchedulerSuite.scala     | 48 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b88a59a6/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b3ebaa5..c8559a7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1038,7 +1038,7 @@ class DAGScheduler(
   private def failJobAndIndependentStages(job: ActiveJob, failureReason: String,
       resultStage: Option[Stage]) {
     val error = new SparkException(failureReason)
-    job.listener.jobFailed(error)
+    var ableToCancelStages = true
 
     val shouldInterruptThread =
       if (job.properties == null) false
@@ -1062,18 +1062,26 @@ class DAGScheduler(
           // This is the only job that uses this stage, so fail the stage if it is running.
           val stage = stageIdToStage(stageId)
           if (runningStages.contains(stage)) {
-            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-            val stageInfo = stageToInfos(stage)
-            stageInfo.stageFailed(failureReason)
-            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
+              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+              val stageInfo = stageToInfos(stage)
+              stageInfo.stageFailed(failureReason)
+              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+            } catch {
+              case e: UnsupportedOperationException =>
+                logInfo(s"Could not cancel tasks for stage $stageId", e)
+              ableToCancelStages = false
+            }
           }
         }
       }
     }
 
-    cleanupStateForJobAndIndependentStages(job, resultStage)
-
-    listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+    if (ableToCancelStages) {
+      job.listener.jobFailed(error)
+      cleanupStateForJobAndIndependentStages(job, resultStage)
+      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+    }
   }
 
   /**
@@ -1155,7 +1163,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
       case x: Exception =>
         logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
           .format(x.getMessage))
-        dagScheduler.doCancelAllJobs()
+        try {
+          dagScheduler.doCancelAllJobs()
+        } catch {
+          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
+        }
         dagScheduler.sc.stop()
         Stop
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b88a59a6/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4536832..8dd2a9b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -115,6 +115,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     sc = new SparkContext("local", "DAGSchedulerSuite")
     sparkListener.successfulStages.clear()
     sparkListener.failedStages.clear()
+    failure = null
     sc.addSparkListener(sparkListener)
     taskSets.clear()
     cancelledStages.clear()
@@ -314,6 +315,53 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  test("job cancellation no-kill backend") {
+    // make sure that the DAGScheduler doesn't crash when the TaskScheduler
+    // doesn't implement killTask()
+    val noKillTaskScheduler = new TaskScheduler() {
+      override def rootPool: Pool = null
+      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+      override def start() = {}
+      override def stop() = {}
+      override def submitTasks(taskSet: TaskSet) = {
+        taskSets += taskSet
+      }
+      override def cancelTasks(stageId: Int, interruptThread: Boolean) {
+        throw new UnsupportedOperationException
+      }
+      override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
+      override def defaultParallelism() = 2
+    }
+    val noKillScheduler = new DAGScheduler(
+      sc,
+      noKillTaskScheduler,
+      sc.listenerBus,
+      mapOutputTracker,
+      blockManagerMaster,
+      sc.env) {
+      override def runLocally(job: ActiveJob) {
+        // don't bother with the thread while unit testing
+        runLocallyWithinThread(job)
+      }
+    }
+    dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+      Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
+    val rdd = makeRdd(1, Nil)
+    val jobId = submit(rdd, Array(0))
+    cancel(jobId)
+    // Because the job wasn't actually cancelled, we shouldn't have received a failure message.
+    assert(failure === null)
+
+    // When the task set completes normally, state should be correctly updated.
+    complete(taskSets(0), Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty
+
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.failedStages.isEmpty)
+    assert(sparkListener.successfulStages.contains(0))
+  }
+
   test("run trivial shuffle") {
     val shuffleMapRdd = makeRdd(2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)