You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2017/02/10 20:10:06 UTC

spark git commit: [SPARK-19549] Allow providing reason for stage/job cancelling

Repository: spark
Updated Branches:
  refs/heads/master 3a43ae7c0 -> d785217b7


[SPARK-19549] Allow providing reason for stage/job cancelling

## What changes were proposed in this pull request?

This change add an optional argument to `SparkContext.cancelStage()` and `SparkContext.cancelJob()` functions, which allows the caller to provide exact reason  for the cancellation.

## How was this patch tested?

Adds unit test.

Author: Ala Luszczak <al...@databricks.com>

Closes #16887 from ala/cancel.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d785217b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d785217b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d785217b

Branch: refs/heads/master
Commit: d785217b791882e075ad537852d49d78fc1ca31b
Parents: 3a43ae7
Author: Ala Luszczak <al...@databricks.com>
Authored: Fri Feb 10 21:10:02 2017 +0100
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Feb 10 21:10:02 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 30 +++++++--
 .../apache/spark/scheduler/DAGScheduler.scala   | 35 ++++++----
 .../spark/scheduler/DAGSchedulerEvent.scala     | 10 ++-
 .../org/apache/spark/scheduler/JobWaiter.scala  |  2 +-
 .../org/apache/spark/SparkContextSuite.scala    | 69 +++++++++++++++++++-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +-
 .../apache/spark/sql/DataFrameRangeSuite.scala  | 19 ++++--
 7 files changed, 138 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index eb13686..cbab7b8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2207,10 +2207,32 @@ class SparkContext(config: SparkConf) extends Logging {
    * Cancel a given job if it's scheduled or running.
    *
    * @param jobId the job ID to cancel
+   * @param reason optional reason for cancellation
    * @note Throws `InterruptedException` if the cancel message cannot be sent
    */
-  def cancelJob(jobId: Int) {
-    dagScheduler.cancelJob(jobId)
+  def cancelJob(jobId: Int, reason: String): Unit = {
+    dagScheduler.cancelJob(jobId, Option(reason))
+  }
+
+  /**
+   * Cancel a given job if it's scheduled or running.
+   *
+   * @param jobId the job ID to cancel
+   * @note Throws `InterruptedException` if the cancel message cannot be sent
+   */
+  def cancelJob(jobId: Int): Unit = {
+    dagScheduler.cancelJob(jobId, None)
+  }
+
+  /**
+   * Cancel a given stage and all jobs associated with it.
+   *
+   * @param stageId the stage ID to cancel
+   * @param reason reason for cancellation
+   * @note Throws `InterruptedException` if the cancel message cannot be sent
+   */
+  def cancelStage(stageId: Int, reason: String): Unit = {
+    dagScheduler.cancelStage(stageId, Option(reason))
   }
 
   /**
@@ -2219,8 +2241,8 @@ class SparkContext(config: SparkConf) extends Logging {
    * @param stageId the stage ID to cancel
    * @note Throws `InterruptedException` if the cancel message cannot be sent
    */
-  def cancelStage(stageId: Int) {
-    dagScheduler.cancelStage(stageId)
+  def cancelStage(stageId: Int): Unit = {
+    dagScheduler.cancelStage(stageId, None)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6177baf..b9d7e13 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -696,9 +696,9 @@ class DAGScheduler(
   /**
    * Cancel a job that is running or waiting in the queue.
    */
-  def cancelJob(jobId: Int): Unit = {
+  def cancelJob(jobId: Int, reason: Option[String]): Unit = {
     logInfo("Asked to cancel job " + jobId)
-    eventProcessLoop.post(JobCancelled(jobId))
+    eventProcessLoop.post(JobCancelled(jobId, reason))
   }
 
   /**
@@ -719,7 +719,7 @@ class DAGScheduler(
   private[scheduler] def doCancelAllJobs() {
     // Cancel all running jobs.
     runningStages.map(_.firstJobId).foreach(handleJobCancellation(_,
-      reason = "as part of cancellation of all jobs"))
+      Option("as part of cancellation of all jobs")))
     activeJobs.clear() // These should already be empty by this point,
     jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
   }
@@ -727,8 +727,8 @@ class DAGScheduler(
   /**
    * Cancel all jobs associated with a running or scheduled stage.
    */
-  def cancelStage(stageId: Int) {
-    eventProcessLoop.post(StageCancelled(stageId))
+  def cancelStage(stageId: Int, reason: Option[String]) {
+    eventProcessLoop.post(StageCancelled(stageId, reason))
   }
 
   /**
@@ -785,7 +785,8 @@ class DAGScheduler(
       }
     }
     val jobIds = activeInGroup.map(_.jobId)
-    jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
+    jobIds.foreach(handleJobCancellation(_,
+        Option("part of cancelled job group %s".format(groupId))))
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
@@ -1377,24 +1378,30 @@ class DAGScheduler(
     }
   }
 
-  private[scheduler] def handleStageCancellation(stageId: Int) {
+  private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) {
     stageIdToStage.get(stageId) match {
       case Some(stage) =>
         val jobsThatUseStage: Array[Int] = stage.jobIds.toArray
         jobsThatUseStage.foreach { jobId =>
-          handleJobCancellation(jobId, s"because Stage $stageId was cancelled")
+          val reasonStr = reason match {
+            case Some(originalReason) =>
+              s"because $originalReason"
+            case None =>
+              s"because Stage $stageId was cancelled"
+          }
+          handleJobCancellation(jobId, Option(reasonStr))
         }
       case None =>
         logInfo("No active jobs to kill for Stage " + stageId)
     }
   }
 
-  private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
+  private[scheduler] def handleJobCancellation(jobId: Int, reason: Option[String]) {
     if (!jobIdToStageIds.contains(jobId)) {
       logDebug("Trying to cancel unregistered job " + jobId)
     } else {
       failJobAndIndependentStages(
-        jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason))
+        jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason.getOrElse("")))
     }
   }
 
@@ -1636,11 +1643,11 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
       dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
 
-    case StageCancelled(stageId) =>
-      dagScheduler.handleStageCancellation(stageId)
+    case StageCancelled(stageId, reason) =>
+      dagScheduler.handleStageCancellation(stageId, reason)
 
-    case JobCancelled(jobId) =>
-      dagScheduler.handleJobCancellation(jobId)
+    case JobCancelled(jobId, reason) =>
+      dagScheduler.handleJobCancellation(jobId, reason)
 
     case JobGroupCancelled(groupId) =>
       dagScheduler.handleJobGroupCancelled(groupId)

http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 03781a2..cda0585 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -53,9 +53,15 @@ private[scheduler] case class MapStageSubmitted(
   properties: Properties = null)
   extends DAGSchedulerEvent
 
-private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent
+private[scheduler] case class StageCancelled(
+    stageId: Int,
+    reason: Option[String])
+  extends DAGSchedulerEvent
 
-private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
+private[scheduler] case class JobCancelled(
+    jobId: Int,
+    reason: Option[String])
+  extends DAGSchedulerEvent
 
 private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 9012289..65d7184 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -50,7 +50,7 @@ private[spark] class JobWaiter[T](
    * will fail this job with a SparkException.
    */
   def cancel() {
-    dagScheduler.cancelJob(jobId)
+    dagScheduler.cancelJob(jobId, None)
   }
 
   override def taskSucceeded(index: Int, result: Any): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 8ae5d2f..5a41e1c 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -22,19 +22,21 @@ import java.net.MalformedURLException
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 
+import scala.concurrent.duration._
 import scala.concurrent.Await
-import scala.concurrent.duration.Duration
 
 import com.google.common.io.Files
 import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
+import org.scalatest.concurrent.Eventually
 import org.scalatest.Matchers._
 
-import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart}
 import org.apache.spark.util.Utils
 
-class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
+
+class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
 
   test("Only one SparkContext may be active at a time") {
     // Regression test for SPARK-4180
@@ -465,4 +467,65 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
     assert(!sc.listenerBus.listeners.contains(sparkListener1))
     assert(sc.listenerBus.listeners.contains(sparkListener2))
   }
+
+  test("Cancelling stages/jobs with custom reasons.") {
+    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+    val REASON = "You shall not pass"
+
+    val listener = new SparkListener {
+      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+        if (SparkContextSuite.cancelStage) {
+          eventually(timeout(10.seconds)) {
+            assert(SparkContextSuite.isTaskStarted)
+          }
+          sc.cancelStage(taskStart.stageId, REASON)
+          SparkContextSuite.cancelStage = false
+        }
+      }
+
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        if (SparkContextSuite.cancelJob) {
+          eventually(timeout(10.seconds)) {
+            assert(SparkContextSuite.isTaskStarted)
+          }
+          sc.cancelJob(jobStart.jobId, REASON)
+          SparkContextSuite.cancelJob = false
+        }
+      }
+    }
+    sc.addSparkListener(listener)
+
+    for (cancelWhat <- Seq("stage", "job")) {
+      SparkContextSuite.isTaskStarted = false
+      SparkContextSuite.cancelStage = (cancelWhat == "stage")
+      SparkContextSuite.cancelJob = (cancelWhat == "job")
+
+      val ex = intercept[SparkException] {
+        sc.range(0, 10000L).mapPartitions { x =>
+          org.apache.spark.SparkContextSuite.isTaskStarted = true
+          x
+        }.cartesian(sc.range(0, 10L))count()
+      }
+
+      ex.getCause() match {
+        case null =>
+          assert(ex.getMessage().contains(REASON))
+        case cause: SparkException =>
+          assert(cause.getMessage().contains(REASON))
+        case cause: Throwable =>
+          fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
+      }
+
+      eventually(timeout(20.seconds)) {
+        assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+      }
+    }
+  }
+
+}
+
+object SparkContextSuite {
+  @volatile var cancelJob = false
+  @volatile var cancelStage = false
+  @volatile var isTaskStarted = false
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f3d3f70..4e5f267 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -329,7 +329,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
 
   /** Sends JobCancelled to the DAG scheduler. */
   private def cancel(jobId: Int) {
-    runEvent(JobCancelled(jobId))
+    runEvent(JobCancelled(jobId, None))
   }
 
   test("[SPARK-3353] parent stage should have lower stage id") {

http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 3ebfd9a..03bf2d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.test.SharedSQLContext
 
 
 class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventually {
+  import testImplicits._
 
   test("SPARK-7150 range api") {
     // numSlice is greater than length
@@ -137,7 +138,9 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
   test("Cancelling stage in a query with Range.") {
     val listener = new SparkListener {
       override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
-        Thread.sleep(100)
+        eventually(timeout(10.seconds)) {
+          assert(DataFrameRangeSuite.isTaskStarted)
+        }
         sparkContext.cancelStage(taskStart.stageId)
       }
     }
@@ -145,9 +148,12 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
     sparkContext.addSparkListener(listener)
     for (codegen <- Seq(true, false)) {
       withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
+        DataFrameRangeSuite.isTaskStarted = false
         val ex = intercept[SparkException] {
-          spark.range(100000L).crossJoin(spark.range(100000L))
-            .toDF("a", "b").agg(sum("a"), sum("b")).collect()
+          spark.range(100000L).mapPartitions { x =>
+            DataFrameRangeSuite.isTaskStarted = true
+            x
+          }.crossJoin(spark.range(100L)).toDF("a", "b").agg(sum("a"), sum("b")).collect()
         }
         ex.getCause() match {
           case null =>
@@ -155,7 +161,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
           case cause: SparkException =>
             assert(cause.getMessage().contains("cancelled"))
           case cause: Throwable =>
-            fail("Expected the casue to be SparkException, got " + cause.toString() + " instead.")
+            fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
         }
       }
       eventually(timeout(20.seconds)) {
@@ -164,3 +170,8 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
     }
   }
 }
+
+object DataFrameRangeSuite {
+  @volatile var isTaskStarted = false
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org