You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2017/09/28 01:25:32 UTC

spark git commit: [SPARK-22123][CORE] Add latest failure reason for task set blacklist

Repository: spark
Updated Branches:
  refs/heads/master 7bf4da8a3 -> 3b117d631


[SPARK-22123][CORE] Add latest failure reason for task set blacklist

## What changes were proposed in this pull request?
This patch add latest failure reason for task set blacklist.Which can be showed on spark ui and let user know failure reason directly.
Till now , every job which aborted by completed blacklist just show log like below which has no more information:
`Aborting $taskSet because task $indexInTaskSet (partition $partition) cannot run anywhere due to node and executor blacklist.  Blacklisting behavior cannot run anywhere due to node and executor blacklist.Blacklisting behavior can be configured via spark.blacklist.*."`
**After modify:**
```
Aborting TaskSet 0.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Some(Lost task 0.1 in stage 0.0 (TID 3,xxx, executor 1): java.lang.Exception: Fake error!
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:73)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
).

Blacklisting behavior can be configured via spark.blacklist.*.

```

## How was this patch tested?

Unit test and manually test.

Author: zhoukang <zh...@gmail.com>

Closes #19338 from caneGuy/zhoukang/improve-blacklist.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b117d63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b117d63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b117d63

Branch: refs/heads/master
Commit: 3b117d631e1ff387b70ed8efba229594f4594db5
Parents: 7bf4da8
Author: zhoukang <zh...@gmail.com>
Authored: Thu Sep 28 09:25:21 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Sep 28 09:25:21 2017 +0800

----------------------------------------------------------------------
 .../spark/scheduler/TaskSetBlacklist.scala      | 14 ++++-
 .../apache/spark/scheduler/TaskSetManager.scala | 15 +++--
 .../scheduler/BlacklistIntegrationSuite.scala   |  5 +-
 .../spark/scheduler/BlacklistTrackerSuite.scala | 60 +++++++++++++-------
 .../scheduler/TaskSchedulerImplSuite.scala      | 11 +++-
 .../spark/scheduler/TaskSetBlacklistSuite.scala | 45 ++++++++++-----
 .../spark/scheduler/TaskSetManagerSuite.scala   |  2 +-
 7 files changed, 104 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
index e815b7e..233781f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
@@ -61,6 +61,16 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
   private val blacklistedExecs = new HashSet[String]()
   private val blacklistedNodes = new HashSet[String]()
 
+  private var latestFailureReason: String = null
+
+  /**
+   * Get the most recent failure reason of this TaskSet.
+   * @return
+   */
+  def getLatestFailureReason: String = {
+    latestFailureReason
+  }
+
   /**
    * Return true if this executor is blacklisted for the given task.  This does *not*
    * need to return true if the executor is blacklisted for the entire stage, or blacklisted
@@ -94,7 +104,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
   private[scheduler] def updateBlacklistForFailedTask(
       host: String,
       exec: String,
-      index: Int): Unit = {
+      index: Int,
+      failureReason: String): Unit = {
+    latestFailureReason = failureReason
     val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
     execFailures.updateWithFailure(index, clock.getTimeMillis())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3804ea8..bb86741 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -670,9 +670,14 @@ private[spark] class TaskSetManager(
           }
           if (blacklistedEverywhere) {
             val partition = tasks(indexInTaskSet).partitionId
-            abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " +
-              s"cannot run anywhere due to node and executor blacklist.  Blacklisting behavior " +
-              s"can be configured via spark.blacklist.*.")
+            abort(s"""
+              |Aborting $taskSet because task $indexInTaskSet (partition $partition)
+              |cannot run anywhere due to node and executor blacklist.
+              |Most recent failure:
+              |${taskSetBlacklist.getLatestFailureReason}
+              |
+              |Blacklisting behavior can be configured via spark.blacklist.*.
+              |""".stripMargin)
           }
         }
       }
@@ -837,9 +842,9 @@ private[spark] class TaskSetManager(
     sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
 
     if (!isZombie && reason.countTowardsTaskFailures) {
-      taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
-        info.host, info.executorId, index))
       assert (null != failureReason)
+      taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
+        info.host, info.executorId, index, failureReason))
       numFailures(index) += 1
       if (numFailures(index) >= maxTaskFailures) {
         logError("Task %d in stage %s failed %d times; aborting job".format(

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index f6015cd..d3bbfd1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -115,8 +115,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
     withBackend(runBackend _) {
       val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
       awaitJobTermination(jobFuture, duration)
-      val pattern = ("Aborting TaskSet 0.0 because task .* " +
-        "cannot run anywhere due to node and executor blacklist").r
+      val pattern = (
+        s"""|Aborting TaskSet 0.0 because task .*
+            |cannot run anywhere due to node and executor blacklist""".stripMargin).r
       assert(pattern.findFirstIn(failure.getMessage).isDefined,
         s"Couldn't find $pattern in ${failure.getMessage()}")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index a136d69..cd1b7a9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -110,7 +110,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
       val taskSetBlacklist = createTaskSetBlacklist(stageId)
       if (stageId % 2 == 0) {
         // fail one task in every other taskset
-        taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+        taskSetBlacklist.updateBlacklistForFailedTask(
+          "hostA", exec = "1", index = 0, failureReason = "testing")
         failuresSoFar += 1
       }
       blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
@@ -132,7 +133,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // for many different stages, executor 1 fails a task, and then the taskSet fails.
     (0 until failuresUntilBlacklisted * 10).foreach { stage =>
       val taskSetBlacklist = createTaskSetBlacklist(stage)
-      taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+      taskSetBlacklist.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = 0, failureReason = "testing")
     }
     assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
   }
@@ -147,7 +149,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
       val numFailures = math.max(conf.get(config.MAX_FAILURES_PER_EXEC),
         conf.get(config.MAX_FAILURES_PER_EXEC_STAGE))
       (0 until numFailures).foreach { index =>
-        taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = index)
+        taskSetBlacklist.updateBlacklistForFailedTask(
+          "hostA", exec = "1", index = index, failureReason = "testing")
       }
       assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
       assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
@@ -170,7 +173,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
     // application.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+      taskSetBlacklist0.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
     assert(blacklist.nodeBlacklist() === Set())
@@ -183,7 +187,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // application.  Since that's the second executor that is blacklisted on the same node, we also
     // blacklist that node.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+      taskSetBlacklist1.updateBlacklistForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
     assert(blacklist.nodeBlacklist() === Set("hostA"))
@@ -207,7 +212,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // Fail one more task, but executor isn't put back into blacklist since the count of failures
     // on that executor should have been reset to 0.
     val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
-    taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+    taskSetBlacklist2.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
     blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures)
     assert(blacklist.nodeBlacklist() === Set())
     assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
@@ -221,7 +227,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // Lets say that executor 1 dies completely.  We get some task failures, but
     // the taskset then finishes successfully (elsewhere).
     (0 until 4).foreach { partition =>
-      taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+      taskSetBlacklist0.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
     }
     blacklist.handleRemovedExecutor("1")
     blacklist.updateBlacklistForSuccessfulTaskSet(
@@ -236,7 +243,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // Now another executor gets spun up on that host, but it also dies.
     val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
     (0 until 4).foreach { partition =>
-      taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+      taskSetBlacklist1.updateBlacklistForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
     }
     blacklist.handleRemovedExecutor("2")
     blacklist.updateBlacklistForSuccessfulTaskSet(
@@ -279,7 +287,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
 
     def failOneTaskInTaskSet(exec: String): Unit = {
       val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
-      taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
+      taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0, "testing")
       blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
       stageId += 1
     }
@@ -354,12 +362,12 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
     val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
     // Taskset1 has one failure immediately
-    taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0)
+    taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
     // Then we have a *long* delay, much longer than the timeout, before any other failures or
     // taskset completion
     clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS * 5)
     // After the long delay, we have one failure on taskset 2, on the same executor
-    taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0)
+    taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
     // Finally, we complete both tasksets.  Its important here to complete taskset2 *first*.  We
     // want to make sure that when taskset 1 finishes, even though we've now got two task failures,
     // we realize that the task failure we just added was well before the timeout.
@@ -377,16 +385,20 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // we blacklist executors on two different hosts -- make sure that doesn't lead to any
     // node blacklisting
     val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
-    taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
-    taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+    taskSetBlacklist0.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
+    taskSetBlacklist0.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 1, failureReason = "testing")
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
     assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
     verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "1", 2))
     assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
 
     val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
-    taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
-    taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
+    taskSetBlacklist1.updateBlacklistForFailedTask(
+      "hostB", exec = "2", index = 0, failureReason = "testing")
+    taskSetBlacklist1.updateBlacklistForFailedTask(
+      "hostB", exec = "2", index = 1, failureReason = "testing")
     blacklist.updateBlacklistForSuccessfulTaskSet(1, 0, taskSetBlacklist1.execToFailures)
     assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
     verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "2", 2))
@@ -395,8 +407,10 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // Finally, blacklist another executor on the same node as the original blacklisted executor,
     // and make sure this time we *do* blacklist the node.
     val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
-    taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 0)
-    taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 1)
+    taskSetBlacklist2.updateBlacklistForFailedTask(
+      "hostA", exec = "3", index = 0, failureReason = "testing")
+    taskSetBlacklist2.updateBlacklistForFailedTask(
+      "hostA", exec = "3", index = 1, failureReason = "testing")
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
     assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2", "3"))
     verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "3", 2))
@@ -486,7 +500,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
     // application.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+      taskSetBlacklist0.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
 
@@ -497,7 +512,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // application.  Since that's the second executor that is blacklisted on the same node, we also
     // blacklist that node.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+      taskSetBlacklist1.updateBlacklistForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
 
@@ -512,7 +528,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
     // application.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+      taskSetBlacklist2.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
 
@@ -523,7 +540,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     // application.  Since that's the second executor that is blacklisted on the same node, we also
     // blacklist that node.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist3.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+      taskSetBlacklist3.updateBlacklistForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index b8626bf..6003899 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -660,9 +660,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(tsm.isZombie)
     assert(failedTaskSet)
     val idx = failedTask.index
-    assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx (partition $idx) " +
-      s"cannot run anywhere due to node and executor blacklist.  Blacklisting behavior can be " +
-      s"configured via spark.blacklist.*.")
+    assert(failedTaskSetReason === s"""
+      |Aborting $taskSet because task $idx (partition $idx)
+      |cannot run anywhere due to node and executor blacklist.
+      |Most recent failure:
+      |${tsm.taskSetBlacklistHelperOpt.get.getLatestFailureReason}
+      |
+      |Blacklisting behavior can be configured via spark.blacklist.*.
+      |""".stripMargin)
   }
 
   test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
index f1392e9..18981d5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
@@ -37,7 +37,8 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
 
     // First, mark task 0 as failed on exec1.
     // task 0 should be blacklisted on exec1, and nowhere else
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", index = 0)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "exec1", index = 0, failureReason = "testing")
     for {
       executor <- (1 to 4).map(_.toString)
       index <- 0 until 10
@@ -49,17 +50,20 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Mark task 1 failed on exec1 -- this pushes the executor into the blacklist
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", index = 1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "exec1", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
     // Mark one task as failed on exec2 -- not enough for any further blacklisting yet.
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", index = 0)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "exec2", index = 0, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
     assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
     // Mark another task as failed on exec2 -- now we blacklist exec2, which also leads to
     // blacklisting the entire node.
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", index = 1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "exec2", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
     assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
@@ -108,34 +112,41 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
       .set(config.MAX_FAILED_EXEC_PER_NODE_STAGE, 3)
     val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock())
     // Fail a task twice on hostA, exec:1
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTask("1", 0))
     assert(!taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0))
     assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Fail the same task once more on hostA, exec:2
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 0)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "2", index = 0, failureReason = "testing")
     assert(taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0))
     assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Fail another task on hostA, exec:1.  Now that executor has failures on two different tasks,
     // so its blacklisted
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Fail a third task on hostA, exec:2, so that exec is blacklisted for the whole task set
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 2)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "2", index = 2, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Fail a fourth & fifth task on hostA, exec:3.  Now we've got three executors that are
     // blacklisted for the taskset, so blacklist the whole node.
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 3)
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 4)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "3", index = 3, failureReason = "testing")
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "3", index = 4, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("3"))
     assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
   }
@@ -147,13 +158,17 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
     val conf = new SparkConf().setAppName("test").setMaster("local")
       .set(config.BLACKLIST_ENABLED.key, "true")
     val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock())
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
-    taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
-    taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostB", exec = "2", index = 0, failureReason = "testing")
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostB", exec = "2", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ae43f4c..5c712bd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1146,7 +1146,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     // Make sure that the blacklist ignored all of the task failures above, since they aren't
     // the fault of the executor where the task was running.
     verify(blacklist, never())
-      .updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
+      .updateBlacklistForFailedTask(anyString(), anyString(), anyInt(), anyString())
   }
 
   test("update application blacklist for shuffle-fetch") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org