You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/03/19 20:47:02 UTC

git commit: Bugfixes/improvements to scheduler

Repository: spark
Updated Branches:
  refs/heads/master 6112270c9 -> ab747d39d


Bugfixes/improvements to scheduler

Move the PR#517 of apache-incubator-spark to the apache-spark

Author: Mridul Muralidharan <mr...@gmail.com>

Closes #159 from mridulm/master and squashes the following commits:

5ff59c2 [Mridul Muralidharan] Change property in suite also
167fad8 [Mridul Muralidharan] Address review comments
9bda70e [Mridul Muralidharan] Address review comments, akwats add to failedExecutors
270d841 [Mridul Muralidharan] Address review comments
fa5d9f1 [Mridul Muralidharan] Bugfixes/improvements to scheduler : PR #517


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab747d39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab747d39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab747d39

Branch: refs/heads/master
Commit: ab747d39ddc7c8a314ed2fb26548fc5652af0d74
Parents: 6112270
Author: Mridul Muralidharan <mr...@gmail.com>
Authored: Wed Mar 19 12:46:55 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Wed Mar 19 12:46:55 2014 -0700

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  3 +-
 .../apache/spark/scheduler/TaskSetManager.scala | 82 +++++++++++++-----
 .../spark/scheduler/TaskSetManagerSuite.scala   | 87 ++++++++++++++++++++
 3 files changed, 152 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab747d39/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 23b0661..abff252 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -235,7 +235,8 @@ private[spark] class TaskSchedulerImpl(
             taskIdToExecutorId(tid) = execId
             activeExecutorIds += execId
             executorsByHost(host) += execId
-            availableCpus(i) -= 1
+            availableCpus(i) -= taskSet.CPUS_PER_TASK
+            assert (availableCpus(i) >= 0)
             launchedTask = true
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/ab747d39/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 5ea4557..a73343c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -59,6 +59,15 @@ private[spark] class TaskSetManager(
   // CPUs to request per task
   val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
 
+  /*
+   * Sometimes if an executor is dead or in an otherwise invalid state, the driver
+   * does not realize right away leading to repeated task failures. If enabled,
+   * this temporarily prevents a task from re-launching on an executor where
+   * it just failed.
+   */
+  private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
+    conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)
+
   // Quantile of tasks at which to start speculation
   val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
   val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
@@ -71,7 +80,9 @@ private[spark] class TaskSetManager(
   val numTasks = tasks.length
   val copiesRunning = new Array[Int](numTasks)
   val successful = new Array[Boolean](numTasks)
-  val numFailures = new Array[Int](numTasks)
+  private val numFailures = new Array[Int](numTasks)
+  // key is taskId, value is a Map of executor id to when it failed
+  private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
   var tasksSuccessful = 0
 
@@ -228,12 +239,18 @@ private[spark] class TaskSetManager(
    * This method also cleans up any tasks in the list that have already
    * been launched, since we want that to happen lazily.
    */
-  private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
-    while (!list.isEmpty) {
-      val index = list.last
-      list.trimEnd(1)
-      if (copiesRunning(index) == 0 && !successful(index)) {
-        return Some(index)
+  private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
+    var indexOffset = list.size
+
+    while (indexOffset > 0) {
+      indexOffset -= 1
+      val index = list(indexOffset)
+      if (!executorIsBlacklisted(execId, index)) {
+        // This should almost always be list.trimEnd(1) to remove tail
+        list.remove(indexOffset)
+        if (copiesRunning(index) == 0 && !successful(index)) {
+          return Some(index)
+        }
       }
     }
     None
@@ -245,6 +262,21 @@ private[spark] class TaskSetManager(
   }
 
   /**
+   * Is this re-execution of a failed task on an executor it already failed in before
+   * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
+   */
+  private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
+    if (failedExecutors.contains(taskId)) {
+      val failed = failedExecutors.get(taskId).get
+
+      return failed.contains(execId) &&
+        clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
+    }
+
+    false
+  }
+
+  /**
    * Return a speculative task for a given executor if any are available. The task should not have
    * an attempt running on this host, in case the host is slow. In addition, the task should meet
    * the given locality constraint.
@@ -254,10 +286,13 @@ private[spark] class TaskSetManager(
   {
     speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
 
+    def canRunOnHost(index: Int): Boolean =
+      !hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
+
     if (!speculatableTasks.isEmpty) {
       // Check for process-local or preference-less tasks; note that tasks can be process-local
       // on multiple nodes when we replicate cached blocks, as in Spark Streaming
-      for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+      for (index <- speculatableTasks if canRunOnHost(index)) {
         val prefs = tasks(index).preferredLocations
         val executors = prefs.flatMap(_.executorId)
         if (prefs.size == 0 || executors.contains(execId)) {
@@ -268,7 +303,7 @@ private[spark] class TaskSetManager(
 
       // Check for node-local tasks
       if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
-        for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+        for (index <- speculatableTasks if canRunOnHost(index)) {
           val locations = tasks(index).preferredLocations.map(_.host)
           if (locations.contains(host)) {
             speculatableTasks -= index
@@ -280,7 +315,7 @@ private[spark] class TaskSetManager(
       // Check for rack-local tasks
       if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
         for (rack <- sched.getRackForHost(host)) {
-          for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+          for (index <- speculatableTasks if canRunOnHost(index)) {
             val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
             if (racks.contains(rack)) {
               speculatableTasks -= index
@@ -292,7 +327,7 @@ private[spark] class TaskSetManager(
 
       // Check for non-local tasks
       if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
-        for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+        for (index <- speculatableTasks if canRunOnHost(index)) {
           speculatableTasks -= index
           return Some((index, TaskLocality.ANY))
         }
@@ -309,12 +344,12 @@ private[spark] class TaskSetManager(
   private def findTask(execId: String, host: String, locality: TaskLocality.Value)
     : Option[(Int, TaskLocality.Value)] =
   {
-    for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
+    for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
       return Some((index, TaskLocality.PROCESS_LOCAL))
     }
 
     if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
-      for (index <- findTaskFromList(getPendingTasksForHost(host))) {
+      for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
         return Some((index, TaskLocality.NODE_LOCAL))
       }
     }
@@ -322,19 +357,19 @@ private[spark] class TaskSetManager(
     if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
       for {
         rack <- sched.getRackForHost(host)
-        index <- findTaskFromList(getPendingTasksForRack(rack))
+        index <- findTaskFromList(execId, getPendingTasksForRack(rack))
       } {
         return Some((index, TaskLocality.RACK_LOCAL))
       }
     }
 
     // Look for no-pref tasks after rack-local tasks since they can run anywhere.
-    for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
+    for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
       return Some((index, TaskLocality.PROCESS_LOCAL))
     }
 
     if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
-      for (index <- findTaskFromList(allPendingTasks)) {
+      for (index <- findTaskFromList(execId, allPendingTasks)) {
         return Some((index, TaskLocality.ANY))
       }
     }
@@ -460,6 +495,7 @@ private[spark] class TaskSetManager(
       logInfo("Ignorning task-finished event for TID " + tid + " because task " +
         index + " has already completed successfully")
     }
+    failedExecutors.remove(index)
     maybeFinishTaskSet()
   }
 
@@ -480,7 +516,7 @@ private[spark] class TaskSetManager(
       logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
     }
     var taskMetrics : TaskMetrics = null
-    var failureReason = "unknown"
+    var failureReason: String = null
     reason match {
       case fetchFailed: FetchFailed =>
         logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
@@ -488,9 +524,11 @@ private[spark] class TaskSetManager(
           successful(index) = true
           tasksSuccessful += 1
         }
+        // Not adding to failed executors for FetchFailed.
         isZombie = true
 
       case TaskKilled =>
+        // Not adding to failed executors for TaskKilled.
         logWarning("Task %d was killed.".format(tid))
 
       case ef: ExceptionFailure =>
@@ -504,7 +542,8 @@ private[spark] class TaskSetManager(
           return
         }
         val key = ef.description
-        failureReason = "Exception failure: %s".format(ef.description)
+        failureReason = "Exception failure in TID %s on host %s: %s".format(
+          tid, info.host, ef.description)
         val now = clock.getTime()
         val (printFull, dupCount) = {
           if (recentExceptions.contains(key)) {
@@ -533,11 +572,16 @@ private[spark] class TaskSetManager(
         failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
         logWarning(failureReason)
 
-      case _ => {}
+      case _ =>
+        failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host)
     }
+    // always add to failed executors
+    failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
+      put(info.executorId, clock.getTime())
     sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
     addPendingTask(index)
     if (!isZombie && state != TaskState.KILLED) {
+      assert (null != failureReason)
       numFailures(index) += 1
       if (numFailures(index) >= maxTaskFailures) {
         logError("Task %s:%d failed %d times; aborting job".format(

http://git-wip-us.apache.org/repos/asf/spark/blob/ab747d39/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 33cc758..73153d2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -298,6 +298,93 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     }
   }
 
+  test("executors should be blacklisted after task failure, in spite of locality preferences") {
+    val rescheduleDelay = 300L
+    val conf = new SparkConf().
+      set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString).
+      // dont wait to jump locality levels in this test
+      set("spark.locality.wait", "0")
+
+    sc = new SparkContext("local", "test", conf)
+    // two executors on same host, one on different.
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+      ("exec1.1", "host1"), ("exec2", "host2"))
+    // affinity to exec1 on host1 - which we will fail.
+    val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, 4, clock)
+
+    {
+      val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
+      assert(offerResult.isDefined, "Expect resource offer to return a task")
+
+      assert(offerResult.get.index === 0)
+      assert(offerResult.get.executorId === "exec1")
+
+      // Cause exec1 to fail : failure 1
+      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
+      assert(!sched.taskSetsFailed.contains(taskSet.id))
+
+      // Ensure scheduling on exec1 fails after failure 1 due to blacklist
+      assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
+      assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
+      assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty)
+      assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty)
+    }
+
+    // Run the task on exec1.1 - should work, and then fail it on exec1.1
+    {
+      val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL)
+      assert(offerResult.isDefined,
+        "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
+
+      assert(offerResult.get.index === 0)
+      assert(offerResult.get.executorId === "exec1.1")
+
+      // Cause exec1.1 to fail : failure 2
+      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
+      assert(!sched.taskSetsFailed.contains(taskSet.id))
+
+      // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
+      assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
+    }
+
+    // Run the task on exec2 - should work, and then fail it on exec2
+    {
+      val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY)
+      assert(offerResult.isDefined, "Expect resource offer to return a task")
+
+      assert(offerResult.get.index === 0)
+      assert(offerResult.get.executorId === "exec2")
+
+      // Cause exec2 to fail : failure 3
+      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
+      assert(!sched.taskSetsFailed.contains(taskSet.id))
+
+      // Ensure scheduling on exec2 fails after failure 3 due to blacklist
+      assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty)
+    }
+
+    // After reschedule delay, scheduling on exec1 should be possible.
+    clock.advance(rescheduleDelay)
+
+    {
+      val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
+      assert(offerResult.isDefined, "Expect resource offer to return a task")
+
+      assert(offerResult.get.index === 0)
+      assert(offerResult.get.executorId === "exec1")
+
+      assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
+
+      // Cause exec1 to fail : failure 4
+      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
+    }
+
+    // we have failed the same task 4 times now : task id should now be in taskSetsFailed
+    assert(sched.taskSetsFailed.contains(taskSet.id))
+  }
+
   def createTaskResult(id: Int): DirectTaskResult[Int] = {
     val valueSer = SparkEnv.get.serializer.newInstance()
     new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)