You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/04/24 01:53:57 UTC

git commit: SPARK-1582 Invoke Thread.interrupt() when cancelling jobs

Repository: spark
Updated Branches:
  refs/heads/master dd1b7a61d -> 432201c7e


SPARK-1582 Invoke Thread.interrupt() when cancelling jobs

Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work.

Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node).

Author: Aaron Davidson <aa...@databricks.com>

Closes #498 from aarondav/cancel and squashes the following commits:

e52b829 [Aaron Davidson] Don't use job.properties when null
82f78bb [Aaron Davidson] Update DAGSchedulerSuite
b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup
4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/432201c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/432201c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/432201c7

Branch: refs/heads/master
Commit: 432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3
Parents: dd1b7a6
Author: Aaron Davidson <aa...@databricks.com>
Authored: Wed Apr 23 16:52:49 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Apr 23 16:52:49 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkContext.scala   | 15 ++++++++++++++-
 .../org/apache/spark/api/python/PythonRDD.scala      |  1 -
 .../executor/CoarseGrainedExecutorBackend.scala      |  4 ++--
 .../scala/org/apache/spark/executor/Executor.scala   | 10 +++++-----
 .../apache/spark/executor/MesosExecutorBackend.scala |  3 ++-
 .../org/apache/spark/scheduler/DAGScheduler.scala    |  6 +++++-
 .../apache/spark/scheduler/SchedulerBackend.scala    |  3 ++-
 .../main/scala/org/apache/spark/scheduler/Task.scala | 12 ++++++++++--
 .../org/apache/spark/scheduler/TaskScheduler.scala   |  2 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala   |  4 ++--
 .../scala/org/apache/spark/scheduler/TaskSet.scala   |  4 ++--
 .../cluster/CoarseGrainedClusterMessage.scala        |  3 ++-
 .../cluster/CoarseGrainedSchedulerBackend.scala      |  8 ++++----
 .../apache/spark/scheduler/local/LocalBackend.scala  | 10 +++++-----
 .../apache/spark/scheduler/DAGSchedulerSuite.scala   |  2 +-
 15 files changed, 57 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c14dce8..dcb6b68 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -381,16 +381,27 @@ class SparkContext(config: SparkConf) extends Logging {
    * // In a separate thread:
    * sc.cancelJobGroup("some_job_to_cancel")
    * }}}
+   *
+   * If interruptOnCancel is set to true for the job group, then job cancellation will result
+   * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
+   * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
+   * where HDFS may respond to Thread.interrupt() by marking nodes as dead.
    */
-  def setJobGroup(groupId: String, description: String) {
+  def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
     setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+    // Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
+    // changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
+    // APIs to also take advantage of this property (e.g., internal job failures or canceling from
+    // JobProgressTab UI) on a per-job basis.
+    setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
   }
 
   /** Clear the current thread's job group ID and its description. */
   def clearJobGroup() {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
     setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
+    setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
   }
 
   // Post init
@@ -1244,6 +1255,8 @@ object SparkContext extends Logging {
 
   private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
 
+  private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
+
   private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
 
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8a843fb..0d71fdb 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -169,7 +169,6 @@ private[spark] class PythonRDD[T: ClassTag](
                 val update = new Array[Byte](updateLen)
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
-
               }
               Array.empty[Byte]
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 6327ac0..9ac7365 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -69,12 +69,12 @@ private[spark] class CoarseGrainedExecutorBackend(
         executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
       }
 
-    case KillTask(taskId, _) =>
+    case KillTask(taskId, _, interruptThread) =>
       if (executor == null) {
         logError("Received KillTask command but executor was null")
         System.exit(1)
       } else {
-        executor.killTask(taskId)
+        executor.killTask(taskId, interruptThread)
       }
 
     case x: DisassociatedEvent =>

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2bfb9c3..914bc20 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -136,10 +136,10 @@ private[spark] class Executor(
     threadPool.execute(tr)
   }
 
-  def killTask(taskId: Long) {
+  def killTask(taskId: Long, interruptThread: Boolean) {
     val tr = runningTasks.get(taskId)
     if (tr != null) {
-      tr.kill()
+      tr.kill(interruptThread)
     }
   }
 
@@ -166,11 +166,11 @@ private[spark] class Executor(
     @volatile private var killed = false
     @volatile private var task: Task[Any] = _
 
-    def kill() {
+    def kill(interruptThread: Boolean) {
       logInfo("Executor is trying to kill task " + taskId)
       killed = true
       if (task != null) {
-        task.kill()
+        task.kill(interruptThread)
       }
     }
 
@@ -257,7 +257,7 @@ private[spark] class Executor(
           execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
         }
 
-        case TaskKilledException => {
+        case TaskKilledException | _: InterruptedException if task.killed => {
           logInfo("Executor killed task " + taskId)
           execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 6fc702f..64e2450 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -76,7 +76,8 @@ private[spark] class MesosExecutorBackend
     if (executor == null) {
       logError("Received KillTask but executor was null")
     } else {
-      executor.killTask(t.getValue.toLong)
+      // TODO: Determine the 'interruptOnCancel' property set for the given job.
+      executor.killTask(t.getValue.toLong, interruptThread = false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c6cbf14..dbde9b5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1055,6 +1055,10 @@ class DAGScheduler(
     val error = new SparkException(failureReason)
     job.listener.jobFailed(error)
 
+    val shouldInterruptThread =
+      if (job.properties == null) false
+      else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
+
     // Cancel all independent, running stages.
     val stages = jobIdToStageIds(job.jobId)
     if (stages.isEmpty) {
@@ -1073,7 +1077,7 @@ class DAGScheduler(
           // This is the only job that uses this stage, so fail the stage if it is running.
           val stage = stageIdToStage(stageId)
           if (runningStages.contains(stage)) {
-            taskScheduler.cancelTasks(stageId)
+            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
             val stageInfo = stageToInfos(stage)
             stageInfo.stageFailed(failureReason)
             listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index f1924a4..6a6d8e6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -28,5 +28,6 @@ private[spark] trait SchedulerBackend {
   def reviveOffers(): Unit
   def defaultParallelism(): Int
 
-  def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException
+  def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
+    throw new UnsupportedOperationException
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index a8bcb7d..2ca3479 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -44,8 +44,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
 
   final def run(attemptId: Long): T = {
     context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
+    taskThread = Thread.currentThread()
     if (_killed) {
-      kill()
+      kill(interruptThread = false)
     }
     runTask(context)
   }
@@ -62,6 +63,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
   // Task context, to be initialized in run().
   @transient protected var context: TaskContext = _
 
+  // The actual Thread on which the task is running, if any. Initialized in run().
+  @volatile @transient private var taskThread: Thread = _
+
   // A flag to indicate whether the task is killed. This is used in case context is not yet
   // initialized when kill() is invoked.
   @volatile @transient private var _killed = false
@@ -75,12 +79,16 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
    * Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
    * code and user code to properly handle the flag. This function should be idempotent so it can
    * be called multiple times.
+   * If interruptThread is true, we will also call Thread.interrupt() on the Task's executor thread.
    */
-  def kill() {
+  def kill(interruptThread: Boolean) {
     _killed = true
     if (context != null) {
       context.interrupted = true
     }
+    if (interruptThread && taskThread != null) {
+      taskThread.interrupt()
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 92616c9..819c352 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -47,7 +47,7 @@ private[spark] trait TaskScheduler {
   def submitTasks(taskSet: TaskSet): Unit
 
   // Cancel a stage.
-  def cancelTasks(stageId: Int)
+  def cancelTasks(stageId: Int, interruptThread: Boolean)
 
   // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
   def setDAGScheduler(dagScheduler: DAGScheduler): Unit

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index fe72ab3..be19d9b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -170,7 +170,7 @@ private[spark] class TaskSchedulerImpl(
     backend.reviveOffers()
   }
 
-  override def cancelTasks(stageId: Int): Unit = synchronized {
+  override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
     logInfo("Cancelling stage " + stageId)
     activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
       // There are two possible cases here:
@@ -181,7 +181,7 @@ private[spark] class TaskSchedulerImpl(
       //    simply abort the stage.
       tsm.runningTasksSet.foreach { tid =>
         val execId = taskIdToExecutorId(tid)
-        backend.killTask(tid, execId)
+        backend.killTask(tid, execId, interruptThread)
       }
       tsm.abort("Stage %s cancelled".format(stageId))
       logInfo("Stage %d was cancelled".format(stageId))

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 03bf760..613fa78 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -31,8 +31,8 @@ private[spark] class TaskSet(
     val properties: Properties) {
     val id: String = stageId + "." + attempt
 
-  def kill() {
-    tasks.foreach(_.kill())
+  def kill(interruptThread: Boolean) {
+    tasks.foreach(_.kill(interruptThread))
   }
 
   override def toString: String = "TaskSet " + id

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 4a9a165..ddbc74e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -30,7 +30,8 @@ private[spark] object CoarseGrainedClusterMessages {
   // Driver to executors
   case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
 
-  case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage
+  case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
+    extends CoarseGrainedClusterMessage
 
   case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
     extends CoarseGrainedClusterMessage

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7bfc30b..a6d6b3d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -101,8 +101,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       case ReviveOffers =>
         makeOffers()
 
-      case KillTask(taskId, executorId) =>
-        executorActor(executorId) ! KillTask(taskId, executorId)
+      case KillTask(taskId, executorId, interruptThread) =>
+        executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)
 
       case StopDriver =>
         sender ! true
@@ -207,8 +207,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     driverActor ! ReviveOffers
   }
 
-  override def killTask(taskId: Long, executorId: String) {
-    driverActor ! KillTask(taskId, executorId)
+  override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
+    driverActor ! KillTask(taskId, executorId, interruptThread)
   }
 
   override def defaultParallelism(): Int = {

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 16e2f5c..43f0e18 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -30,7 +30,7 @@ private case class ReviveOffers()
 
 private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
 
-private case class KillTask(taskId: Long)
+private case class KillTask(taskId: Long, interruptThread: Boolean)
 
 /**
  * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
@@ -61,8 +61,8 @@ private[spark] class LocalActor(
         reviveOffers()
       }
 
-    case KillTask(taskId) =>
-      executor.killTask(taskId)
+    case KillTask(taskId, interruptThread) =>
+      executor.killTask(taskId, interruptThread)
   }
 
   def reviveOffers() {
@@ -99,8 +99,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
 
   override def defaultParallelism() = totalCores
 
-  override def killTask(taskId: Long, executorId: String) {
-    localActor ! KillTask(taskId)
+  override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
+    localActor ! KillTask(taskId, interruptThread)
   }
 
   override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {

http://git-wip-us.apache.org/repos/asf/spark/blob/432201c7/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 35a7ac9..ff69eb7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -58,7 +58,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
       taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
       taskSets += taskSet
     }
-    override def cancelTasks(stageId: Int) {
+    override def cancelTasks(stageId: Int, interruptThread: Boolean) {
       cancelledStages += stageId
     }
     override def setDAGScheduler(dagScheduler: DAGScheduler) = {}