You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/01/12 19:20:18 UTC
[spark] branch master updated: [SPARK-34069][CORE] Kill barrier
tasks should respect SPARK_JOB_INTERRUPT_ON_CANCEL
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 65222b7 [SPARK-34069][CORE] Kill barrier tasks should respect SPARK_JOB_INTERRUPT_ON_CANCEL
65222b7 is described below
commit 65222b705109b91b89b1aa5ca811bf667cc422cd
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Jan 12 13:19:45 2021 -0600
[SPARK-34069][CORE] Kill barrier tasks should respect SPARK_JOB_INTERRUPT_ON_CANCEL
### What changes were proposed in this pull request?
Add shouldInterruptTaskThread check when kill barrier task.
### Why are the changes needed?
We should interrupt task thread if user set local property `SPARK_JOB_INTERRUPT_ON_CANCEL` to true.
### Does this PR introduce _any_ user-facing change?
Yes, task will be interrupted if user set `SPARK_JOB_INTERRUPT_ON_CANCEL` to true.
### How was this patch tested?
Add test.
Closes #31127 from ulysses-you/SPARK-34069.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../org/apache/spark/scheduler/DAGScheduler.scala | 4 +-
.../spark/scheduler/BarrierTaskContextSuite.scala | 43 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 02f5bb8..f333cee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1916,7 +1916,9 @@ private[spark] class DAGScheduler(
// killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
"failed."
- taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason)
+ val job = jobIdToActiveJob.get(failedStage.firstJobId)
+ val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
+ taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)
} catch {
case e: UnsupportedOperationException =>
// Cannot continue with barrier stage if failed to cancel zombie barrier tasks.
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index b7ac9ec..55ea2fc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -289,4 +289,47 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}.getMessage
assert(errorMsg.contains("Fail resource offers for barrier stage"))
}
+
+ test("SPARK-34069: Kill barrier tasks should respect SPARK_JOB_INTERRUPT_ON_CANCEL") {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[2]"))
+ var index = 0
+ var checkDone = false
+ var startTime = 0L
+ val listener = new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ if (startTime == 0) {
+ startTime = taskStart.taskInfo.launchTime
+ }
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ if (index == 0) {
+ assert(taskEnd.reason.isInstanceOf[ExceptionFailure])
+ assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 1000)
+ index = 1
+ } else if (index == 1) {
+ assert(taskEnd.reason.isInstanceOf[TaskKilled])
+ assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 1000)
+ index = 2
+ checkDone = true
+ }
+ }
+ }
+ sc.addSparkListener(listener)
+ sc.setJobGroup("test", "", true)
+ sc.parallelize(Seq(1, 2), 2).barrier().mapPartitions { it =>
+ if (TaskContext.get().stageAttemptNumber() == 0) {
+ if (it.hasNext && it.next() == 1) {
+ throw new RuntimeException("failed")
+ } else {
+ Thread.sleep(5000)
+ }
+ }
+ it
+ }.groupBy(x => x).collect()
+ sc.listenerBus.waitUntilEmpty()
+ assert(checkDone)
+ // double check we kill task success
+ assert(System.currentTimeMillis() - startTime < 5000)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org