You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/01/12 19:21:07 UTC

[spark] branch branch-3.1 updated: [SPARK-34069][CORE] Kill barrier tasks should respect SPARK_JOB_INTERRUPT_ON_CANCEL

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 983dbb9  [SPARK-34069][CORE] Kill barrier tasks should respect SPARK_JOB_INTERRUPT_ON_CANCEL
983dbb9 is described below

commit 983dbb9dfa96b4c76d93574632462d201255c01d
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Jan 12 13:19:45 2021 -0600

    [SPARK-34069][CORE] Kill barrier tasks should respect SPARK_JOB_INTERRUPT_ON_CANCEL
    
    ### What changes were proposed in this pull request?
    
    Add shouldInterruptTaskThread check when kill barrier task.
    
    ### Why are the changes needed?
    
    We should interrupt task thread if user set local property `SPARK_JOB_INTERRUPT_ON_CANCEL` to true.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, task will be interrupted if user set `SPARK_JOB_INTERRUPT_ON_CANCEL` to true.
    
    ### How was this patch tested?
    
    Add test.
    
    Closes #31127 from ulysses-you/SPARK-34069.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 65222b705109b91b89b1aa5ca811bf667cc422cd)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  4 +-
 .../spark/scheduler/BarrierTaskContextSuite.scala  | 43 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 02f5bb8..f333cee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1916,7 +1916,9 @@ private[spark] class DAGScheduler(
             // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
             val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
               "failed."
-            taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason)
+            val job = jobIdToActiveJob.get(failedStage.firstJobId)
+            val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
+            taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)
           } catch {
             case e: UnsupportedOperationException =>
               // Cannot continue with barrier stage if failed to cancel zombie barrier tasks.
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index b7ac9ec..55ea2fc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -289,4 +289,47 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
     }.getMessage
     assert(errorMsg.contains("Fail resource offers for barrier stage"))
   }
+
+  test("SPARK-34069: Kill barrier tasks should respect SPARK_JOB_INTERRUPT_ON_CANCEL") {
+    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[2]"))
+    var index = 0
+    var checkDone = false
+    var startTime = 0L
+    val listener = new SparkListener {
+      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+        if (startTime == 0) {
+          startTime = taskStart.taskInfo.launchTime
+        }
+      }
+
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+        if (index == 0) {
+          assert(taskEnd.reason.isInstanceOf[ExceptionFailure])
+          assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 1000)
+          index = 1
+        } else if (index == 1) {
+          assert(taskEnd.reason.isInstanceOf[TaskKilled])
+          assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 1000)
+          index = 2
+          checkDone = true
+        }
+      }
+    }
+    sc.addSparkListener(listener)
+    sc.setJobGroup("test", "", true)
+    sc.parallelize(Seq(1, 2), 2).barrier().mapPartitions { it =>
+      if (TaskContext.get().stageAttemptNumber() == 0) {
+        if (it.hasNext && it.next() == 1) {
+          throw new RuntimeException("failed")
+        } else {
+          Thread.sleep(5000)
+        }
+      }
+      it
+    }.groupBy(x => x).collect()
+    sc.listenerBus.waitUntilEmpty()
+    assert(checkDone)
+    // double check we kill task success
+    assert(System.currentTimeMillis() - startTime < 5000)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org