You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2016/10/12 21:43:16 UTC

spark git commit: [SPARK-17675][CORE] Expand Blacklist for TaskSets

Repository: spark
Updated Branches:
  refs/heads/master 47776e7c0 -> 9ce7d3e54


[SPARK-17675][CORE] Expand Blacklist for TaskSets

## What changes were proposed in this pull request?

This is a step along the way to SPARK-8425.

To enable incremental review, the first step proposed here is to expand the blacklisting within tasksets. In particular, this will enable blacklisting for
* (task, executor) pairs (this already exists via an undocumented config)
* (task, node)
* (taskset, executor)
* (taskset, node)

Adding (task, node) is critical to making spark fault-tolerant of one-bad disk in a cluster, without requiring careful tuning of "spark.task.maxFailures". The other additions are also important to avoid many misleading task failures and long scheduling delays when there is one bad node on a large cluster.

Note that some of the code changes here aren't really required for just this -- they put pieces in place for SPARK-8425 even though they are not used yet (eg. the `BlacklistTracker` helper is a little out of place, `TaskSetBlacklist` holds onto a little more info than it needs to for just this change, and `ExecutorFailuresInTaskSet` is more complex than it needs to be).

## How was this patch tested?

Added unit tests, run tests via jenkins.

Author: Imran Rashid <ir...@cloudera.com>
Author: mwws <we...@intel.com>

Closes #15249 from squito/taskset_blacklist_only.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ce7d3e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ce7d3e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ce7d3e5

Branch: refs/heads/master
Commit: 9ce7d3e542e786c62f047c13f3001e178f76e06a
Parents: 47776e7
Author: Imran Rashid <ir...@cloudera.com>
Authored: Wed Oct 12 16:43:03 2016 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Wed Oct 12 16:43:03 2016 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |   4 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |  11 +
 .../apache/spark/internal/config/package.scala  |  45 +++
 .../spark/scheduler/BlacklistTracker.scala      | 114 ++++++++
 .../scheduler/ExecutorFailuresInTaskSet.scala   |  50 ++++
 .../spark/scheduler/TaskSchedulerImpl.scala     |  31 +--
 .../spark/scheduler/TaskSetBlacklist.scala      | 128 +++++++++
 .../apache/spark/scheduler/TaskSetManager.scala | 276 ++++++++++---------
 .../scheduler/BlacklistIntegrationSuite.scala   |  52 ++--
 .../spark/scheduler/BlacklistTrackerSuite.scala |  81 ++++++
 .../scheduler/SchedulerIntegrationSuite.scala   |   4 +-
 .../scheduler/TaskSchedulerImplSuite.scala      |  22 +-
 .../spark/scheduler/TaskSetBlacklistSuite.scala | 163 +++++++++++
 .../spark/scheduler/TaskSetManagerSuite.scala   | 131 ++++++++-
 .../KryoSerializerDistributedSuite.scala        |   4 +-
 docs/configuration.md                           |  43 +++
 .../sql/execution/ui/SQLListenerSuite.scala     |   3 +-
 17 files changed, 964 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 51a699f4..c9c342d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -636,7 +636,9 @@ private[spark] object SparkConf extends Logging {
         "Please use spark.kryoserializer.buffer instead. The default value for " +
           "spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
           "are no longer accepted. To specify the equivalent now, one may use '64k'."),
-      DeprecatedConfig("spark.rpc", "2.0", "Not used any more.")
+      DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
+      DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
+        "Please use the new blacklisting options, spark.blacklist.*")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 4269084..7ca3c10 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -92,6 +92,16 @@ case class FetchFailed(
     s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
       s"message=\n$message\n)"
   }
+
+  /**
+   * Fetch failures lead to a different failure handling path: (1) we don't abort the stage after
+   * 4 task failures, instead we immediately go back to the stage which generated the map output,
+   * and regenerate the missing data.  (2) we don't count fetch failures for blacklisting, since
+   * presumably its not the fault of the executor where the task ran, but the executor which
+   * stored the data. This is especially important because we we might rack up a bunch of
+   * fetch-failures in rapid succession, on all nodes of the cluster, due to one bad node.
+   */
+  override def countTowardsTaskFailures: Boolean = false
 }
 
 /**
@@ -204,6 +214,7 @@ case object TaskResultLost extends TaskFailedReason {
 @DeveloperApi
 case object TaskKilled extends TaskFailedReason {
   override def toErrorString: String = "TaskKilled (killed intentionally)"
+  override def countTowardsTaskFailures: Boolean = false
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 517fc3e..497ca92 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.internal
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.util.Utils
@@ -91,6 +93,49 @@ package object config {
     .toSequence
     .createWithDefault(Nil)
 
+  private[spark] val MAX_TASK_FAILURES =
+    ConfigBuilder("spark.task.maxFailures")
+      .intConf
+      .createWithDefault(4)
+
+  // Blacklist confs
+  private[spark] val BLACKLIST_ENABLED =
+    ConfigBuilder("spark.blacklist.enabled")
+      .booleanConf
+      .createOptional
+
+  private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR =
+    ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor")
+      .intConf
+      .createWithDefault(1)
+
+  private[spark] val MAX_TASK_ATTEMPTS_PER_NODE =
+    ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode")
+      .intConf
+      .createWithDefault(2)
+
+  private[spark] val MAX_FAILURES_PER_EXEC_STAGE =
+    ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor")
+      .intConf
+      .createWithDefault(2)
+
+  private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE =
+    ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode")
+      .intConf
+      .createWithDefault(2)
+
+  private[spark] val BLACKLIST_TIMEOUT_CONF =
+    ConfigBuilder("spark.blacklist.timeout")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
+  private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
+    ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
+      .internal()
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+  // End blacklist confs
+
   private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
     ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
       .intConf

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
new file mode 100644
index 0000000..fca4c6d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.util.Utils
+
+private[scheduler] object BlacklistTracker extends Logging {
+
+  private val DEFAULT_TIMEOUT = "1h"
+
+  /**
+   * Returns true if the blacklist is enabled, based on checking the configuration in the following
+   * order:
+   * 1. Is it specifically enabled or disabled?
+   * 2. Is it enabled via the legacy timeout conf?
+   * 3. Default is off
+   */
+  def isBlacklistEnabled(conf: SparkConf): Boolean = {
+    conf.get(config.BLACKLIST_ENABLED) match {
+      case Some(enabled) =>
+        enabled
+      case None =>
+        // if they've got a non-zero setting for the legacy conf, always enable the blacklist,
+        // otherwise, use the default.
+        val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key
+        conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).exists { legacyTimeout =>
+          if (legacyTimeout == 0) {
+            logWarning(s"Turning off blacklisting due to legacy configuration: $legacyKey == 0")
+            false
+          } else {
+            logWarning(s"Turning on blacklisting due to legacy configuration: $legacyKey > 0")
+            true
+          }
+        }
+    }
+  }
+
+  def getBlacklistTimeout(conf: SparkConf): Long = {
+    conf.get(config.BLACKLIST_TIMEOUT_CONF).getOrElse {
+      conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).getOrElse {
+        Utils.timeStringAsMs(DEFAULT_TIMEOUT)
+      }
+    }
+  }
+
+  /**
+   * Verify that blacklist configurations are consistent; if not, throw an exception.  Should only
+   * be called if blacklisting is enabled.
+   *
+   * The configuration for the blacklist is expected to adhere to a few invariants.  Default
+   * values follow these rules of course, but users may unwittingly change one configuration
+   * without making the corresponding adjustment elsewhere.  This ensures we fail-fast when
+   * there are such misconfigurations.
+   */
+  def validateBlacklistConfs(conf: SparkConf): Unit = {
+
+    def mustBePos(k: String, v: String): Unit = {
+      throw new IllegalArgumentException(s"$k was $v, but must be > 0.")
+    }
+
+    Seq(
+      config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
+      config.MAX_TASK_ATTEMPTS_PER_NODE,
+      config.MAX_FAILURES_PER_EXEC_STAGE,
+      config.MAX_FAILED_EXEC_PER_NODE_STAGE
+    ).foreach { config =>
+      val v = conf.get(config)
+      if (v <= 0) {
+        mustBePos(config.key, v.toString)
+      }
+    }
+
+    val timeout = getBlacklistTimeout(conf)
+    if (timeout <= 0) {
+      // first, figure out where the timeout came from, to include the right conf in the message.
+      conf.get(config.BLACKLIST_TIMEOUT_CONF) match {
+        case Some(t) =>
+          mustBePos(config.BLACKLIST_TIMEOUT_CONF.key, timeout.toString)
+        case None =>
+          mustBePos(config.BLACKLIST_LEGACY_TIMEOUT_CONF.key, timeout.toString)
+      }
+    }
+
+    val maxTaskFailures = conf.get(config.MAX_TASK_FAILURES)
+    val maxNodeAttempts = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
+
+    if (maxNodeAttempts >= maxTaskFailures) {
+      throw new IllegalArgumentException(s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
+        s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
+        s"( = ${maxTaskFailures} ).  Though blacklisting is enabled, with this configuration, " +
+        s"Spark will not be robust to one bad node.  Decrease " +
+        s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " +
+        s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
new file mode 100644
index 0000000..20ab27d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.HashMap
+
+/**
+ * Small helper for tracking failed tasks for blacklisting purposes.  Info on all failures on one
+ * executor, within one task set.
+ */
+private[scheduler] class ExecutorFailuresInTaskSet(val node: String) {
+  /**
+   * Mapping from index of the tasks in the taskset, to the number of times it has failed on this
+   * executor.
+   */
+  val taskToFailureCount = HashMap[Int, Int]()
+
+  def updateWithFailure(taskIndex: Int): Unit = {
+    val prevFailureCount = taskToFailureCount.getOrElse(taskIndex, 0)
+    taskToFailureCount(taskIndex) = prevFailureCount + 1
+  }
+
+  def numUniqueTasksWithFailures: Int = taskToFailureCount.size
+
+  /**
+   * Return the number of times this executor has failed on the given task index.
+   */
+  def getNumTaskFailures(index: Int): Int = {
+    taskToFailureCount.getOrElse(index, 0)
+  }
+
+  override def toString(): String = {
+    s"numUniqueTasksWithFailures = $numUniqueTasksWithFailures; " +
+      s"tasksToFailureCount = $taskToFailureCount"
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 0ad4730..3e3f1ad 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -22,14 +22,14 @@ import java.util.{Timer, TimerTask}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
+import scala.collection.Set
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.util.Random
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.TaskLocality.TaskLocality
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
@@ -57,7 +57,7 @@ private[spark] class TaskSchedulerImpl(
     isLocal: Boolean = false)
   extends TaskScheduler with Logging
 {
-  def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))
+  def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
 
   val conf = sc.conf
 
@@ -100,7 +100,7 @@ private[spark] class TaskSchedulerImpl(
 
   // The set of executors we have on each host; this is used to compute hostsAlive, which
   // in turn is used to decide when we can attain data locality on a given host
-  protected val executorsByHost = new HashMap[String, HashSet[String]]
+  protected val hostToExecutors = new HashMap[String, HashSet[String]]
 
   protected val hostsByRack = new HashMap[String, HashSet[String]]
 
@@ -243,8 +243,8 @@ private[spark] class TaskSchedulerImpl(
       }
     }
     manager.parent.removeSchedulable(manager)
-    logInfo("Removed TaskSet %s, whose tasks have all completed, from pool %s"
-      .format(manager.taskSet.id, manager.parent.name))
+    logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
+      s" ${manager.parent.name}")
   }
 
   private def resourceOfferSingleTaskSet(
@@ -291,11 +291,11 @@ private[spark] class TaskSchedulerImpl(
     // Also track if new executor is added
     var newExecAvail = false
     for (o <- offers) {
-      if (!executorsByHost.contains(o.host)) {
-        executorsByHost(o.host) = new HashSet[String]()
+      if (!hostToExecutors.contains(o.host)) {
+        hostToExecutors(o.host) = new HashSet[String]()
       }
       if (!executorIdToTaskCount.contains(o.executorId)) {
-        executorsByHost(o.host) += o.executorId
+        hostToExecutors(o.host) += o.executorId
         executorAdded(o.executorId, o.host)
         executorIdToHost(o.executorId) = o.host
         executorIdToTaskCount(o.executorId) = 0
@@ -334,7 +334,7 @@ private[spark] class TaskSchedulerImpl(
         } while (launchedTaskAtCurrentMaxLocality)
       }
       if (!launchedAnyTask) {
-        taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
       }
     }
 
@@ -542,10 +542,10 @@ private[spark] class TaskSchedulerImpl(
     executorIdToTaskCount -= executorId
 
     val host = executorIdToHost(executorId)
-    val execs = executorsByHost.getOrElse(host, new HashSet)
+    val execs = hostToExecutors.getOrElse(host, new HashSet)
     execs -= executorId
     if (execs.isEmpty) {
-      executorsByHost -= host
+      hostToExecutors -= host
       for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
         hosts -= host
         if (hosts.isEmpty) {
@@ -565,11 +565,11 @@ private[spark] class TaskSchedulerImpl(
   }
 
   def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
-    executorsByHost.get(host).map(_.toSet)
+    hostToExecutors.get(host).map(_.toSet)
   }
 
   def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
-    executorsByHost.contains(host)
+    hostToExecutors.contains(host)
   }
 
   def hasHostAliveOnRack(rack: String): Boolean = synchronized {
@@ -662,5 +662,4 @@ private[spark] object TaskSchedulerImpl {
 
     retval.toList
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
new file mode 100644
index 0000000..f4b0f55
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.{HashMap, HashSet}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.util.Clock
+
+/**
+ * Handles blacklisting executors and nodes within a taskset.  This includes blacklisting specific
+ * (task, executor) / (task, nodes) pairs, and also completely blacklisting executors and nodes
+ * for the entire taskset.
+ *
+ * THREADING:  This class is a helper to [[TaskSetManager]]; as with the methods in
+ * [[TaskSetManager]] this class is designed only to be called from code with a lock on the
+ * TaskScheduler (e.g. its event handlers). It should not be called from other threads.
+ */
+private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, val clock: Clock)
+    extends Logging {
+
+  private val MAX_TASK_ATTEMPTS_PER_EXECUTOR = conf.get(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR)
+  private val MAX_TASK_ATTEMPTS_PER_NODE = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
+  private val MAX_FAILURES_PER_EXEC_STAGE = conf.get(config.MAX_FAILURES_PER_EXEC_STAGE)
+  private val MAX_FAILED_EXEC_PER_NODE_STAGE = conf.get(config.MAX_FAILED_EXEC_PER_NODE_STAGE)
+
+  /**
+   * A map from each executor to the task failures on that executor.
+   */
+  val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()
+
+  /**
+   * Map from node to all executors on it with failures.  Needed because we want to know about
+   * executors on a node even after they have died. (We don't want to bother tracking the
+   * node -> execs mapping in the usual case when there aren't any failures).
+   */
+  private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]()
+  private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()
+  private val blacklistedExecs = new HashSet[String]()
+  private val blacklistedNodes = new HashSet[String]()
+
+  /**
+   * Return true if this executor is blacklisted for the given task.  This does *not*
+   * need to return true if the executor is blacklisted for the entire stage.
+   * That is to keep this method as fast as possible in the inner-loop of the
+   * scheduler, where those filters will have already been applied.
+   */
+  def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {
+    execToFailures.get(executorId).exists { execFailures =>
+      execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
+    }
+  }
+
+  def isNodeBlacklistedForTask(node: String, index: Int): Boolean = {
+    nodeToBlacklistedTaskIndexes.get(node).exists(_.contains(index))
+  }
+
+  /**
+   * Return true if this executor is blacklisted for the given stage.  Completely ignores
+   * anything to do with the node the executor is on.  That
+   * is to keep this method as fast as possible in the inner-loop of the scheduler, where those
+   * filters will already have been applied.
+   */
+  def isExecutorBlacklistedForTaskSet(executorId: String): Boolean = {
+    blacklistedExecs.contains(executorId)
+  }
+
+  def isNodeBlacklistedForTaskSet(node: String): Boolean = {
+    blacklistedNodes.contains(node)
+  }
+
+  private[scheduler] def updateBlacklistForFailedTask(
+      host: String,
+      exec: String,
+      index: Int): Unit = {
+    val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
+    execFailures.updateWithFailure(index)
+
+    // check if this task has also failed on other executors on the same host -- if its gone
+    // over the limit, blacklist this task from the entire host.
+    val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
+    execsWithFailuresOnNode += exec
+    val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
+      execToFailures.get(exec).map { failures =>
+        // We count task attempts here, not the number of unique executors with failures.  This is
+        // because jobs are aborted based on the number task attempts; if we counted unique
+        // executors, it would be hard to config to ensure that you try another
+        // node before hitting the max number of task failures.
+        failures.getNumTaskFailures(index)
+      }
+    }.sum
+    if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) {
+      nodeToBlacklistedTaskIndexes.getOrElseUpdate(host, new HashSet()) += index
+    }
+
+    // Check if enough tasks have failed on the executor to blacklist it for the entire stage.
+    if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
+      if (blacklistedExecs.add(exec)) {
+        logInfo(s"Blacklisting executor ${exec} for stage $stageId")
+        // This executor has been pushed into the blacklist for this stage.  Let's check if it
+        // pushes the whole node into the blacklist.
+        val blacklistedExecutorsOnNode =
+          execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
+        if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) {
+          if (blacklistedNodes.add(host)) {
+            logInfo(s"Blacklisting ${host} for stage $stageId")
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 226bed2..9491bc7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -22,9 +22,7 @@ import java.nio.ByteBuffer
 import java.util.Arrays
 import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.math.{max, min}
 import scala.util.control.NonFatal
 
@@ -53,19 +51,9 @@ private[spark] class TaskSetManager(
     sched: TaskSchedulerImpl,
     val taskSet: TaskSet,
     val maxTaskFailures: Int,
-    clock: Clock = new SystemClock())
-  extends Schedulable with Logging {
+    clock: Clock = new SystemClock()) extends Schedulable with Logging {
 
-  val conf = sched.sc.conf
-
-  /*
-   * Sometimes if an executor is dead or in an otherwise invalid state, the driver
-   * does not realize right away leading to repeated task failures. If enabled,
-   * this temporarily prevents a task from re-launching on an executor where
-   * it just failed.
-   */
-  private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
-    conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)
+  private val conf = sched.sc.conf
 
   // Quantile of tasks at which to start speculation
   val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
@@ -83,8 +71,6 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
   val successful = new Array[Boolean](numTasks)
   private val numFailures = new Array[Int](numTasks)
-  // key is taskId (aka TaskInfo.index), value is a Map of executor id to when it failed
-  private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
 
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
   var tasksSuccessful = 0
@@ -98,6 +84,14 @@ private[spark] class TaskSetManager(
   var totalResultSize = 0L
   var calculatedTasks = 0
 
+  private val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
+    if (BlacklistTracker.isBlacklistEnabled(conf)) {
+      Some(new TaskSetBlacklist(conf, stageId, clock))
+    } else {
+      None
+    }
+  }
+
   val runningTasksSet = new HashSet[Long]
 
   override def runningTasks: Int = runningTasksSet.size
@@ -245,12 +239,15 @@ private[spark] class TaskSetManager(
    * This method also cleans up any tasks in the list that have already
    * been launched, since we want that to happen lazily.
    */
-  private def dequeueTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
+  private def dequeueTaskFromList(
+      execId: String,
+      host: String,
+      list: ArrayBuffer[Int]): Option[Int] = {
     var indexOffset = list.size
     while (indexOffset > 0) {
       indexOffset -= 1
       val index = list(indexOffset)
-      if (!executorIsBlacklisted(execId, index)) {
+      if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) {
         // This should almost always be list.trimEnd(1) to remove tail
         list.remove(indexOffset)
         if (copiesRunning(index) == 0 && !successful(index)) {
@@ -266,19 +263,11 @@ private[spark] class TaskSetManager(
     taskAttempts(taskIndex).exists(_.host == host)
   }
 
-  /**
-   * Is this re-execution of a failed task on an executor it already failed in before
-   * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
-   */
-  private[scheduler] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
-    if (failedExecutors.contains(taskId)) {
-      val failed = failedExecutors.get(taskId).get
-
-      return failed.contains(execId) &&
-        clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
+  private def isTaskBlacklistedOnExecOrNode(index: Int, execId: String, host: String): Boolean = {
+    taskSetBlacklistHelperOpt.exists { blacklist =>
+      blacklist.isNodeBlacklistedForTask(host, index) ||
+        blacklist.isExecutorBlacklistedForTask(execId, index)
     }
-
-    false
   }
 
   /**
@@ -292,8 +281,10 @@ private[spark] class TaskSetManager(
   {
     speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
 
-    def canRunOnHost(index: Int): Boolean =
-      !hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
+    def canRunOnHost(index: Int): Boolean = {
+      !hasAttemptOnHost(index, host) &&
+        !isTaskBlacklistedOnExecOrNode(index, execId, host)
+    }
 
     if (!speculatableTasks.isEmpty) {
       // Check for process-local tasks; note that tasks can be process-local
@@ -366,19 +357,19 @@ private[spark] class TaskSetManager(
   private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
     : Option[(Int, TaskLocality.Value, Boolean)] =
   {
-    for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
+    for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
       return Some((index, TaskLocality.PROCESS_LOCAL, false))
     }
 
     if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
-      for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
+      for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
         return Some((index, TaskLocality.NODE_LOCAL, false))
       }
     }
 
     if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
       // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
-      for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
+      for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
         return Some((index, TaskLocality.PROCESS_LOCAL, false))
       }
     }
@@ -386,14 +377,14 @@ private[spark] class TaskSetManager(
     if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
       for {
         rack <- sched.getRackForHost(host)
-        index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))
+        index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
       } {
         return Some((index, TaskLocality.RACK_LOCAL, false))
       }
     }
 
     if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
-      for (index <- dequeueTaskFromList(execId, allPendingTasks)) {
+      for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
         return Some((index, TaskLocality.ANY, false))
       }
     }
@@ -421,7 +412,11 @@ private[spark] class TaskSetManager(
       maxLocality: TaskLocality.TaskLocality)
     : Option[TaskDescription] =
   {
-    if (!isZombie) {
+    val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
+      blacklist.isNodeBlacklistedForTaskSet(host) ||
+        blacklist.isExecutorBlacklistedForTaskSet(execId)
+    }
+    if (!isZombie && !offerBlacklisted) {
       val curTime = clock.getTimeMillis()
 
       var allowedLocality = maxLocality
@@ -434,60 +429,59 @@ private[spark] class TaskSetManager(
         }
       }
 
-      dequeueTask(execId, host, allowedLocality) match {
-        case Some((index, taskLocality, speculative)) =>
-          // Found a task; do some bookkeeping and return a task description
-          val task = tasks(index)
-          val taskId = sched.newTaskId()
-          // Do various bookkeeping
-          copiesRunning(index) += 1
-          val attemptNum = taskAttempts(index).size
-          val info = new TaskInfo(taskId, index, attemptNum, curTime,
-            execId, host, taskLocality, speculative)
-          taskInfos(taskId) = info
-          taskAttempts(index) = info :: taskAttempts(index)
-          // Update our locality level for delay scheduling
-          // NO_PREF will not affect the variables related to delay scheduling
-          if (maxLocality != TaskLocality.NO_PREF) {
-            currentLocalityIndex = getLocalityIndex(taskLocality)
-            lastLaunchTime = curTime
-          }
-          // Serialize and return the task
-          val startTime = clock.getTimeMillis()
-          val serializedTask: ByteBuffer = try {
-            Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
-          } catch {
-            // If the task cannot be serialized, then there's no point to re-attempt the task,
-            // as it will always fail. So just abort the whole task-set.
-            case NonFatal(e) =>
-              val msg = s"Failed to serialize task $taskId, not attempting to retry it."
-              logError(msg, e)
-              abort(s"$msg Exception during serialization: $e")
-              throw new TaskNotSerializableException(e)
-          }
-          if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
-              !emittedTaskSizeWarning) {
-            emittedTaskSizeWarning = true
-            logWarning(s"Stage ${task.stageId} contains a task of very large size " +
-              s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
-              s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
-          }
-          addRunningTask(taskId)
-
-          // We used to log the time it takes to serialize the task, but task size is already
-          // a good proxy to task serialization time.
-          // val timeTaken = clock.getTime() - startTime
-          val taskName = s"task ${info.id} in stage ${taskSet.id}"
-          logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
-            s" $taskLocality, ${serializedTask.limit} bytes)")
-
-          sched.dagScheduler.taskStarted(task, info)
-          return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
-            taskName, index, serializedTask))
-        case _ =>
+      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
+        // Found a task; do some bookkeeping and return a task description
+        val task = tasks(index)
+        val taskId = sched.newTaskId()
+        // Do various bookkeeping
+        copiesRunning(index) += 1
+        val attemptNum = taskAttempts(index).size
+        val info = new TaskInfo(taskId, index, attemptNum, curTime,
+          execId, host, taskLocality, speculative)
+        taskInfos(taskId) = info
+        taskAttempts(index) = info :: taskAttempts(index)
+        // Update our locality level for delay scheduling
+        // NO_PREF will not affect the variables related to delay scheduling
+        if (maxLocality != TaskLocality.NO_PREF) {
+          currentLocalityIndex = getLocalityIndex(taskLocality)
+          lastLaunchTime = curTime
+        }
+        // Serialize and return the task
+        val startTime = clock.getTimeMillis()
+        val serializedTask: ByteBuffer = try {
+          Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+        } catch {
+          // If the task cannot be serialized, then there's no point to re-attempt the task,
+          // as it will always fail. So just abort the whole task-set.
+          case NonFatal(e) =>
+            val msg = s"Failed to serialize task $taskId, not attempting to retry it."
+            logError(msg, e)
+            abort(s"$msg Exception during serialization: $e")
+            throw new TaskNotSerializableException(e)
+        }
+        if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
+          !emittedTaskSizeWarning) {
+          emittedTaskSizeWarning = true
+          logWarning(s"Stage ${task.stageId} contains a task of very large size " +
+            s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
+            s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
+        }
+        addRunningTask(taskId)
+
+        // We used to log the time it takes to serialize the task, but task size is already
+        // a good proxy to task serialization time.
+        // val timeTaken = clock.getTime() - startTime
+        val taskName = s"task ${info.id} in stage ${taskSet.id}"
+        logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
+          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
+
+        sched.dagScheduler.taskStarted(task, info)
+        new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
+          taskName, index, serializedTask)
       }
+    } else {
+      None
     }
-    None
   }
 
   private def maybeFinishTaskSet() {
@@ -589,37 +583,56 @@ private[spark] class TaskSetManager(
    * the hang as quickly as we could have, but we'll always detect the hang eventually, and the
    * method is faster in the typical case. In the worst case, this method can take
    * O(maxTaskFailures + numTasks) time, but it will be faster when there haven't been any task
-   * failures (this is because the method picks on unscheduled task, and then iterates through each
-   * executor until it finds one that the task hasn't failed on already).
+   * failures (this is because the method picks one unscheduled task, and then iterates through each
+   * executor until it finds one that the task isn't blacklisted on).
    */
-  private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = {
-
-    val pendingTask: Option[Int] = {
-      // usually this will just take the last pending task, but because of the lazy removal
-      // from each list, we may need to go deeper in the list.  We poll from the end because
-      // failed tasks are put back at the end of allPendingTasks, so we're more likely to find
-      // an unschedulable task this way.
-      val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
-        copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
-      }
-      if (indexOffset == -1) {
-        None
-      } else {
-        Some(allPendingTasks(indexOffset))
-      }
-    }
+  private[scheduler] def abortIfCompletelyBlacklisted(
+      hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
+    taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
+      // Only look for unschedulable tasks when at least one executor has registered. Otherwise,
+      // task sets will be (unnecessarily) aborted in cases when no executors have registered yet.
+      if (hostToExecutors.nonEmpty) {
+        // find any task that needs to be scheduled
+        val pendingTask: Option[Int] = {
+          // usually this will just take the last pending task, but because of the lazy removal
+          // from each list, we may need to go deeper in the list.  We poll from the end because
+          // failed tasks are put back at the end of allPendingTasks, so we're more likely to find
+          // an unschedulable task this way.
+          val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
+            copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
+          }
+          if (indexOffset == -1) {
+            None
+          } else {
+            Some(allPendingTasks(indexOffset))
+          }
+        }
 
-    // If no executors have registered yet, don't abort the stage, just wait.  We probably
-    // got here because a task set was added before the executors registered.
-    if (executors.nonEmpty) {
-      // take any task that needs to be scheduled, and see if we can find some executor it *could*
-      // run on
-      pendingTask.foreach { taskId =>
-        if (executors.forall(executorIsBlacklisted(_, taskId))) {
-          val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
-          val partition = tasks(taskId).partitionId
-          abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" +
-            s" has already failed on executors $execs, and no other executors are available.")
+        pendingTask.foreach { indexInTaskSet =>
+          // try to find some executor this task can run on.  Its possible that some *other*
+          // task isn't schedulable anywhere, but we will discover that in some later call,
+          // when that unschedulable task is the last task remaining.
+          val blacklistedEverywhere = hostToExecutors.forall { case (host, execsOnHost) =>
+            // Check if the task can run on the node
+            val nodeBlacklisted =
+              taskSetBlacklist.isNodeBlacklistedForTaskSet(host) ||
+              taskSetBlacklist.isNodeBlacklistedForTask(host, indexInTaskSet)
+            if (nodeBlacklisted) {
+              true
+            } else {
+              // Check if the task can run on any of the executors
+              execsOnHost.forall { exec =>
+                  taskSetBlacklist.isExecutorBlacklistedForTaskSet(exec) ||
+                  taskSetBlacklist.isExecutorBlacklistedForTask(exec, indexInTaskSet)
+              }
+            }
+          }
+          if (blacklistedEverywhere) {
+            val partition = tasks(indexInTaskSet).partitionId
+            abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " +
+              s"cannot run anywhere due to node and executor blacklist.  Blacklisting behavior " +
+              s"can be configured via spark.blacklist.*.")
+          }
         }
       }
     }
@@ -677,8 +690,9 @@ private[spark] class TaskSetManager(
     }
     if (!successful(index)) {
       tasksSuccessful += 1
-      logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
-        info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
+      logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +
+        s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +
+        s" ($tasksSuccessful/$numTasks)")
       // Mark successful and stop if all the tasks have succeeded.
       successful(index) = true
       if (tasksSuccessful == numTasks) {
@@ -688,7 +702,6 @@ private[spark] class TaskSetManager(
       logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
         " because task " + index + " has already completed successfully")
     }
-    failedExecutors.remove(index)
     maybeFinishTaskSet()
   }
 
@@ -706,8 +719,8 @@ private[spark] class TaskSetManager(
     val index = info.index
     copiesRunning(index) -= 1
     var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
-    val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
-      reason.toErrorString
+    val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}," +
+      s" executor ${info.executorId}): ${reason.toErrorString}"
     val failureException: Option[Throwable] = reason match {
       case fetchFailed: FetchFailed =>
         logWarning(failureReason)
@@ -715,7 +728,6 @@ private[spark] class TaskSetManager(
           successful(index) = true
           tasksSuccessful += 1
         }
-        // Not adding to failed executors for FetchFailed.
         isZombie = true
         None
 
@@ -751,8 +763,8 @@ private[spark] class TaskSetManager(
           logWarning(failureReason)
         } else {
           logInfo(
-            s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on executor ${info.host}: " +
-            s"${ef.className} (${ef.description}) [duplicate $dupCount]")
+            s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on ${info.host}, executor" +
+              s" ${info.executorId}: ${ef.className} (${ef.description}) [duplicate $dupCount]")
         }
         ef.exception
 
@@ -766,9 +778,7 @@ private[spark] class TaskSetManager(
         logWarning(failureReason)
         None
     }
-    // always add to failed executors
-    failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
-      put(info.executorId, clock.getTimeMillis())
+
     sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
 
     if (successful(index)) {
@@ -780,7 +790,9 @@ private[spark] class TaskSetManager(
       addPendingTask(index)
     }
 
-    if (!isZombie && state != TaskState.KILLED && reason.countTowardsTaskFailures) {
+    if (!isZombie && reason.countTowardsTaskFailures) {
+      taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
+        info.host, info.executorId, index))
       assert (null != failureReason)
       numFailures(index) += 1
       if (numFailures(index) >= maxTaskFailures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 14c8b66..f6015cd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -16,10 +16,10 @@
  */
 package org.apache.spark.scheduler
 
-import scala.concurrent.Await
 import scala.concurrent.duration._
 
 import org.apache.spark._
+import org.apache.spark.internal.config
 
 class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{
 
@@ -42,7 +42,10 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
 
   // Test demonstrating the issue -- without a config change, the scheduler keeps scheduling
   // according to locality preferences, and so the job fails
-  testScheduler("If preferred node is bad, without blacklist job will fail") {
+  testScheduler("If preferred node is bad, without blacklist job will fail",
+    extraConfs = Seq(
+      config.BLACKLIST_ENABLED.key -> "false"
+  )) {
     val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
     withBackend(badHostBackend _) {
       val jobFuture = submit(rdd, (0 until 10).toArray)
@@ -51,37 +54,38 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
     assertDataStructuresEmpty(noFailure = false)
   }
 
-  // even with the blacklist turned on, if maxTaskFailures is not more than the number
-  // of executors on the bad node, then locality preferences will lead to us cycling through
-  // the executors on the bad node, and still failing the job
   testScheduler(
-    "With blacklist on, job will still fail if there are too many bad executors on bad host",
+    "With default settings, job can succeed despite multiple bad executors on node",
     extraConfs = Seq(
-      // set this to something much longer than the test duration so that executors don't get
-      // removed from the blacklist during the test
-      ("spark.scheduler.executorTaskBlacklistTime", "10000000")
+      config.BLACKLIST_ENABLED.key -> "true",
+      config.MAX_TASK_FAILURES.key -> "4",
+      "spark.testing.nHosts" -> "2",
+      "spark.testing.nExecutorsPerHost" -> "5",
+      "spark.testing.nCoresPerExecutor" -> "10"
     )
   ) {
-    val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
+    // To reliably reproduce the failure that would occur without blacklisting, we have to use 1
+    // task.  That way, we ensure this 1 task gets rotated through enough bad executors on the host
+    // to fail the taskSet, before we have a bunch of different tasks fail in the executors so we
+    // blacklist them.
+    // But the point here is -- without blacklisting, we would never schedule anything on the good
+    // host-1 before we hit too many failures trying our preferred host-0.
+    val rdd = new MockRDDWithLocalityPrefs(sc, 1, Nil, badHost)
     withBackend(badHostBackend _) {
-      val jobFuture = submit(rdd, (0 until 10).toArray)
+      val jobFuture = submit(rdd, (0 until 1).toArray)
       awaitJobTermination(jobFuture, duration)
     }
-    assertDataStructuresEmpty(noFailure = false)
+    assertDataStructuresEmpty(noFailure = true)
   }
 
-  // Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually
-  // schedule on a good node and succeed the job
+  // Here we run with the blacklist on, and the default config takes care of having this
+  // robust to one bad node.
   testScheduler(
     "Bad node with multiple executors, job will still succeed with the right confs",
     extraConfs = Seq(
-      // set this to something much longer than the test duration so that executors don't get
-      // removed from the blacklist during the test
-      ("spark.scheduler.executorTaskBlacklistTime", "10000000"),
-      // this has to be higher than the number of executors on the bad host
-      ("spark.task.maxFailures", "5"),
+       config.BLACKLIST_ENABLED.key -> "true",
       // just to avoid this test taking too long
-      ("spark.locality.wait", "10ms")
+      "spark.locality.wait" -> "10ms"
     )
   ) {
     val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
@@ -98,9 +102,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
   testScheduler(
     "SPARK-15865 Progress with fewer executors than maxTaskFailures",
     extraConfs = Seq(
-      // set this to something much longer than the test duration so that executors don't get
-      // removed from the blacklist during the test
-      "spark.scheduler.executorTaskBlacklistTime" -> "10000000",
+      config.BLACKLIST_ENABLED.key -> "true",
       "spark.testing.nHosts" -> "2",
       "spark.testing.nExecutorsPerHost" -> "1",
       "spark.testing.nCoresPerExecutor" -> "1"
@@ -112,9 +114,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
     }
     withBackend(runBackend _) {
       val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
-      Await.ready(jobFuture, duration)
+      awaitJobTermination(jobFuture, duration)
       val pattern = ("Aborting TaskSet 0.0 because task .* " +
-        "already failed on executors \\(.*\\), and no other executors are available").r
+        "cannot run anywhere due to node and executor blacklist").r
       assert(pattern.findFirstIn(failure.getMessage).isDefined,
         s"Couldn't find $pattern in ${failure.getMessage()}")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
new file mode 100644
index 0000000..b2e7ec5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config
+
+class BlacklistTrackerSuite extends SparkFunSuite {
+
+  test("blacklist still respects legacy configs") {
+    val conf = new SparkConf().setMaster("local")
+    assert(!BlacklistTracker.isBlacklistEnabled(conf))
+    conf.set(config.BLACKLIST_LEGACY_TIMEOUT_CONF, 5000L)
+    assert(BlacklistTracker.isBlacklistEnabled(conf))
+    assert(5000 === BlacklistTracker.getBlacklistTimeout(conf))
+    // the new conf takes precedence, though
+    conf.set(config.BLACKLIST_TIMEOUT_CONF, 1000L)
+    assert(1000 === BlacklistTracker.getBlacklistTimeout(conf))
+
+    // if you explicitly set the legacy conf to 0, that also would disable blacklisting
+    conf.set(config.BLACKLIST_LEGACY_TIMEOUT_CONF, 0L)
+    assert(!BlacklistTracker.isBlacklistEnabled(conf))
+    // but again, the new conf takes precendence
+    conf.set(config.BLACKLIST_ENABLED, true)
+    assert(BlacklistTracker.isBlacklistEnabled(conf))
+    assert(1000 === BlacklistTracker.getBlacklistTimeout(conf))
+  }
+
+  test("check blacklist configuration invariants") {
+    val conf = new SparkConf().setMaster("yarn-cluster")
+    Seq(
+      (2, 2),
+      (2, 3)
+    ).foreach { case (maxTaskFailures, maxNodeAttempts) =>
+      conf.set(config.MAX_TASK_FAILURES, maxTaskFailures)
+      conf.set(config.MAX_TASK_ATTEMPTS_PER_NODE.key, maxNodeAttempts.toString)
+      val excMsg = intercept[IllegalArgumentException] {
+        BlacklistTracker.validateBlacklistConfs(conf)
+      }.getMessage()
+      assert(excMsg === s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
+        s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
+        s"( = ${maxTaskFailures} ).  Though blacklisting is enabled, with this configuration, " +
+        s"Spark will not be robust to one bad node.  Decrease " +
+        s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " +
+        s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
+    }
+
+    conf.remove(config.MAX_TASK_FAILURES)
+    conf.remove(config.MAX_TASK_ATTEMPTS_PER_NODE)
+
+    Seq(
+      config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
+      config.MAX_TASK_ATTEMPTS_PER_NODE,
+      config.MAX_FAILURES_PER_EXEC_STAGE,
+      config.MAX_FAILED_EXEC_PER_NODE_STAGE,
+      config.BLACKLIST_TIMEOUT_CONF
+    ).foreach { config =>
+      conf.set(config.key, "0")
+      val excMsg = intercept[IllegalArgumentException] {
+        BlacklistTracker.validateBlacklistConfs(conf)
+      }.getMessage()
+      assert(excMsg.contains(s"${config.key} was 0, but must be > 0."))
+      conf.remove(config)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 5cd548b..c28aa06 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -620,9 +620,9 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
       val duration = Duration(1, SECONDS)
       awaitJobTermination(jobFuture, duration)
     }
+    assertDataStructuresEmpty()
     assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap)
     assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1)))
-    assertDataStructuresEmpty()
   }
 
   testScheduler("job failure after 4 attempts") {
@@ -634,7 +634,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
       val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
       val duration = Duration(1, SECONDS)
       awaitJobTermination(jobFuture, duration)
-      failure.getMessage.contains("test task failure")
+      assert(failure.getMessage.contains("test task failure"))
     }
     assertDataStructuresEmpty(noFailure = false)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 61787b5..f5f1947 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark._
+import org.apache.spark.internal.config
 import org.apache.spark.internal.Logging
 
 class FakeSchedulerBackend extends SchedulerBackend {
@@ -32,7 +33,6 @@ class FakeSchedulerBackend extends SchedulerBackend {
 class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach
     with Logging {
 
-
   var failedTaskSetException: Option[Throwable] = None
   var failedTaskSetReason: String = null
   var failedTaskSet = false
@@ -60,10 +60,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
   }
 
   def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
     confs.foreach { case (k, v) =>
-      sc.conf.set(k, v)
+      conf.set(k, v)
     }
+    sc = new SparkContext(conf)
     taskScheduler = new TaskSchedulerImpl(sc)
     taskScheduler.initialize(new FakeSchedulerBackend)
     // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
@@ -287,9 +288,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     // schedulable on another executor.  However, that executor may fail later on, leaving the
     // first task with no place to run.
     val taskScheduler = setupScheduler(
-      // set this to something much longer than the test duration so that executors don't get
-      // removed from the blacklist during the test
-      "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+      config.BLACKLIST_ENABLED.key -> "true"
     )
 
     val taskSet = FakeTask.createTaskSet(2)
@@ -328,8 +327,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(tsm.isZombie)
     assert(failedTaskSet)
     val idx = failedTask.index
-    assert(failedTaskSetReason == s"Aborting TaskSet 0.0 because task $idx (partition $idx) has " +
-      s"already failed on executors (executor0), and no other executors are available.")
+    assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx (partition $idx) " +
+      s"cannot run anywhere due to node and executor blacklist.  Blacklisting behavior can be " +
+      s"configured via spark.blacklist.*.")
   }
 
   test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {
@@ -339,9 +339,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     // available and not bail on the job
 
     val taskScheduler = setupScheduler(
-      // set this to something much longer than the test duration so that executors don't get
-      // removed from the blacklist during the test
-      "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+      config.BLACKLIST_ENABLED.key -> "true"
     )
 
     val taskSet = FakeTask.createTaskSet(2, (0 until 2).map { _ => Seq(TaskLocation("host0")) }: _*)
@@ -377,7 +375,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     val taskScheduler = setupScheduler()
 
     taskScheduler.submitTasks(FakeTask.createTaskSet(2, 0,
-      (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2"))}: _*
+      (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _*
     ))
 
     val taskDescs = taskScheduler.resourceOffers(IndexedSeq(

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
new file mode 100644
index 0000000..8c902af
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.{ManualClock, SystemClock}
+
+class TaskSetBlacklistSuite extends SparkFunSuite {
+
+  test("Blacklisting tasks, executors, and nodes") {
+    val conf = new SparkConf().setAppName("test").setMaster("local")
+      .set(config.BLACKLIST_ENABLED.key, "true")
+    val clock = new ManualClock
+
+    val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, clock = clock)
+    clock.setTime(0)
+    // We will mark task 0 & 1 failed on both executor 1 & 2.
+    // We should blacklist all executors on that host, for all tasks for the stage.  Note the API
+    // will return false for isExecutorBacklistedForTaskSet even when the node is blacklisted, so
+    // the executor is implicitly blacklisted (this makes sense with how the scheduler uses the
+    // blacklist)
+
+    // First, mark task 0 as failed on exec1.
+    // task 0 should be blacklisted on exec1, and nowhere else
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", index = 0)
+    for {
+      executor <- (1 to 4).map(_.toString)
+      index <- 0 until 10
+    } {
+      val shouldBeBlacklisted = (executor == "exec1" && index == 0)
+      assert(taskSetBlacklist.isExecutorBlacklistedForTask(executor, index) === shouldBeBlacklisted)
+    }
+    assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+    // Mark task 1 failed on exec1 -- this pushes the executor into the blacklist
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", index = 1)
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+    // Mark one task as failed on exec2 -- not enough for any further blacklisting yet.
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", index = 0)
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+    assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+    // Mark another task as failed on exec2 -- now we blacklist exec2, which also leads to
+    // blacklisting the entire node.
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", index = 1)
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
+    assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+    // Make sure the blacklist has the correct per-task && per-executor responses, over a wider
+    // range of inputs.
+    for {
+      executor <- (1 to 4).map(e => s"exec$e")
+      index <- 0 until 10
+    } {
+      withClue(s"exec = $executor; index = $index") {
+        val badExec = (executor == "exec1" || executor == "exec2")
+        val badIndex = (index == 0 || index == 1)
+        assert(
+          // this ignores whether the executor is blacklisted entirely for the taskset -- that is
+          // intentional, it keeps it fast and is sufficient for usage in the scheduler.
+          taskSetBlacklist.isExecutorBlacklistedForTask(executor, index) === (badExec && badIndex))
+        assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet(executor) === badExec)
+      }
+    }
+    assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+    val execToFailures = taskSetBlacklist.execToFailures
+    assert(execToFailures.keySet === Set("exec1", "exec2"))
+
+    Seq("exec1", "exec2").foreach { exec =>
+      assert(
+        execToFailures(exec).taskToFailureCount === Map(
+          0 -> 1,
+          1 -> 1
+        )
+      )
+    }
+  }
+
+  test("multiple attempts for the same task count once") {
+    // Make sure that for blacklisting tasks, the node counts task attempts, not executors.  But for
+    // stage-level blacklisting, we count unique tasks.  The reason for this difference is, with
+    // task-attempt blacklisting, we want to make it easy to configure so that you ensure a node
+    // is blacklisted before the taskset is completely aborted because of spark.task.maxFailures.
+    // But with stage-blacklisting, we want to make sure we're not just counting one bad task
+    // that has failed many times.
+
+    val conf = new SparkConf().setMaster("local").setAppName("test")
+      .set(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR, 2)
+      .set(config.MAX_TASK_ATTEMPTS_PER_NODE, 3)
+      .set(config.MAX_FAILURES_PER_EXEC_STAGE, 2)
+      .set(config.MAX_FAILED_EXEC_PER_NODE_STAGE, 3)
+    val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock())
+    // Fail a task twice on hostA, exec:1
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+    assert(taskSetBlacklist.isExecutorBlacklistedForTask("1", 0))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0))
+    assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+    // Fail the same task once more on hostA, exec:2
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 0)
+    assert(taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0))
+    assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+    // Fail another task on hostA, exec:1.  Now that executor has failures on two different tasks,
+    // so its blacklisted
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+    // Fail a third task on hostA, exec:2, so that exec is blacklisted for the whole task set
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 2)
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+    // Fail a fourth & fifth task on hostA, exec:3.  Now we've got three executors that are
+    // blacklisted for the taskset, so blacklist the whole node.
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 3)
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 4)
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("3"))
+    assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+  }
+
+  test("only blacklist nodes for the task set when all the blacklisted executors are all on " +
+    "same host") {
+    // we blacklist executors on two different hosts within one taskSet -- make sure that doesn't
+    // lead to any node blacklisting
+    val conf = new SparkConf().setAppName("test").setMaster("local")
+      .set(config.BLACKLIST_ENABLED.key, "true")
+    val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock())
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
+    taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
+    taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+    assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+    assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostB"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 7d6ad08..69edcf3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.mockito.Mockito.{mock, verify}
 
 import org.apache.spark._
+import org.apache.spark.internal.config
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.{AccumulatorV2, ManualClock}
 
@@ -103,7 +104,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
     val host = executorIdToHost.get(execId)
     assert(host != None)
     val hostId = host.get
-    val executorsOnHost = executorsByHost(hostId)
+    val executorsOnHost = hostToExecutors(hostId)
     executorsOnHost -= execId
     for (rack <- getRackForHost(hostId); hosts <- hostsByRack.get(rack)) {
       hosts -= hostId
@@ -125,7 +126,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
 
   def addExecutor(execId: String, host: String) {
     executors.put(execId, host)
-    val executorsOnHost = executorsByHost.getOrElseUpdate(host, new mutable.HashSet[String])
+    val executorsOnHost = hostToExecutors.getOrElseUpdate(host, new mutable.HashSet[String])
     executorsOnHost += execId
     executorIdToHost += execId -> host
     for (rack <- getRackForHost(host)) {
@@ -411,7 +412,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
   test("executors should be blacklisted after task failure, in spite of locality preferences") {
     val rescheduleDelay = 300L
     val conf = new SparkConf().
-      set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString).
+      set(config.BLACKLIST_ENABLED, true).
+      set(config.BLACKLIST_TIMEOUT_CONF, rescheduleDelay).
       // don't wait to jump locality levels in this test
       set("spark.locality.wait", "0")
 
@@ -475,19 +477,24 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
     }
 
-    // After reschedule delay, scheduling on exec1 should be possible.
+    // Despite advancing beyond the time for expiring executors from within the blacklist,
+    // we *never* expire from *within* the stage blacklist
     clock.advance(rescheduleDelay)
 
     {
       val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
-      assert(offerResult.isDefined, "Expect resource offer to return a task")
+      assert(offerResult.isEmpty)
+    }
 
+    {
+      val offerResult = manager.resourceOffer("exec3", "host3", ANY)
+      assert(offerResult.isDefined)
       assert(offerResult.get.index === 0)
-      assert(offerResult.get.executorId === "exec1")
+      assert(offerResult.get.executorId === "exec3")
 
-      assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty)
+      assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty)
 
-      // Cause exec1 to fail : failure 4
+      // Cause exec3 to fail : failure 4
       manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
     }
 
@@ -859,6 +866,114 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     assert(sched.endedTasks(3) === Success)
   }
 
+  test("Killing speculative tasks does not count towards aborting the taskset") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = FakeTask.createTaskSet(5)
+    // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
+    sc.conf.set("spark.speculation.multiplier", "0.0")
+    sc.conf.set("spark.speculation.quantile", "0.6")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+    // Offer resources for 5 tasks to start
+    val tasks = new ArrayBuffer[TaskDescription]()
+    for ((k, v) <- List(
+      "exec1" -> "host1",
+      "exec1" -> "host1",
+      "exec1" -> "host1",
+      "exec2" -> "host2",
+      "exec2" -> "host2")) {
+      val taskOption = manager.resourceOffer(k, v, NO_PREF)
+      assert(taskOption.isDefined)
+      val task = taskOption.get
+      assert(task.executorId === k)
+      tasks += task
+    }
+    assert(sched.startedTasks.toSet === (0 until 5).toSet)
+    // Complete 3 tasks and leave 2 tasks in running
+    for (id <- Set(0, 1, 2)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    def runningTaskForIndex(index: Int): TaskDescription = {
+      tasks.find { task =>
+        task.index == index && !sched.endedTasks.contains(task.taskId)
+      }.getOrElse {
+        throw new RuntimeException(s"couldn't find index $index in " +
+          s"tasks: ${tasks.map{t => t.index -> t.taskId}} with endedTasks:" +
+          s" ${sched.endedTasks.keys}")
+      }
+    }
+
+    // have each of the running tasks fail 3 times (not enough to abort the stage)
+    (0 until 3).foreach { attempt =>
+      Seq(3, 4).foreach { index =>
+        val task = runningTaskForIndex(index)
+        logInfo(s"failing task $task")
+        val endReason = ExceptionFailure("a", "b", Array(), "c", None)
+        manager.handleFailedTask(task.taskId, TaskState.FAILED, endReason)
+        sched.endedTasks(task.taskId) = endReason
+        assert(!manager.isZombie)
+        val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF)
+        assert(nextTask.isDefined, s"no offer for attempt $attempt of $index")
+        tasks += nextTask.get
+      }
+    }
+
+    // we can't be sure which one of our running tasks will get another speculative copy
+    val originalTasks = Seq(3, 4).map { index => index -> runningTaskForIndex(index) }.toMap
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+    // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
+    // > 0ms, so advance the clock by 1ms here.
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
+    assert(taskOption5.isDefined)
+    val speculativeTask = taskOption5.get
+    assert(speculativeTask.index === 3 || speculativeTask.index === 4)
+    assert(speculativeTask.taskId === 11)
+    assert(speculativeTask.executorId === "exec1")
+    assert(speculativeTask.attemptNumber === 4)
+    sched.backend = mock(classOf[SchedulerBackend])
+    // Complete the speculative attempt for the running task
+    manager.handleSuccessfulTask(speculativeTask.taskId, createTaskResult(3, accumUpdatesByTask(3)))
+    // Verify that it kills other running attempt
+    val origTask = originalTasks(speculativeTask.index)
+    verify(sched.backend).killTask(origTask.taskId, "exec2", true)
+    // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be
+    // killed, so the FakeTaskScheduler is only told about the successful completion
+    // of the speculated task.
+    assert(sched.endedTasks(3) === Success)
+    // also because the scheduler is a mock, our manager isn't notified about the task killed event,
+    // so we do that manually
+    manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled)
+    // this task has "failed" 4 times, but one of them doesn't count, so keep running the stage
+    assert(manager.tasksSuccessful === 4)
+    assert(!manager.isZombie)
+
+    // now run another speculative task
+    val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF)
+    assert(taskOpt6.isDefined)
+    val speculativeTask2 = taskOpt6.get
+    assert(speculativeTask2.index === 3 || speculativeTask2.index === 4)
+    assert(speculativeTask2.index !== speculativeTask.index)
+    assert(speculativeTask2.attemptNumber === 4)
+    // Complete the speculative attempt for the running task
+    manager.handleSuccessfulTask(speculativeTask2.taskId,
+      createTaskResult(3, accumUpdatesByTask(3)))
+    // Verify that it kills other running attempt
+    val origTask2 = originalTasks(speculativeTask2.index)
+    verify(sched.backend).killTask(origTask2.taskId, "exec2", true)
+    assert(manager.tasksSuccessful === 5)
+    assert(manager.isZombie)
+  }
+
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index c1484b0..46aa9c3 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.serializer
 import com.esotericsoftware.kryo.Kryo
 
 import org.apache.spark._
+import org.apache.spark.internal.config
 import org.apache.spark.serializer.KryoDistributedTest._
 import org.apache.spark.util.Utils
 
@@ -29,7 +30,8 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
     val conf = new SparkConf(false)
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .set("spark.kryo.registrator", classOf[AppJarRegistrator].getName)
-      .set("spark.task.maxFailures", "1")
+      .set(config.MAX_TASK_FAILURES, 1)
+      .set(config.BLACKLIST_ENABLED, false)
 
     val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
     conf.setJars(List(jar.getPath))

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 82ce232..373e22d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1246,6 +1246,49 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.blacklist.enabled</code></td>
+  <td>
+    false
+  </td>
+  <td>
+    If set to "true", prevent Spark from scheduling tasks on executors that have been blacklisted
+    due to too many task failures. The blacklisting algorithm can be further controlled by the
+    other "spark.blacklist" configuration options.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.blacklist.task.maxTaskAttemptsPerExecutor</code></td>
+  <td>1</td>
+  <td>
+    (Experimental) For a given task, how many times it can be retried on one executor before the
+    executor is blacklisted for that task.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.blacklist.task.maxTaskAttemptsPerNode</code></td>
+  <td>2</td>
+  <td>
+    (Experimental) For a given task, how many times it can be retried on one node, before the entire
+    node is blacklisted for that task.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.blacklist.stage.maxFailedTasksPerExecutor</code>
+  <td>2</td>
+  <td>
+    (Experimental) How many different tasks must fail on one executor, within one stage, before the
+    executor is blacklisted for that stage.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.blacklist.stage.maxFailedExecutorsPerNode</code></td>
+  <td>2</td>
+  <td>
+    (Experimental) How many different executors are marked as blacklisted for a given stage, before
+    the entire node is marked as failed for the stage.
+  </td>
+</tr>
+<tr>
   <td><code>spark.speculation</code></td>
   <td>false</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/9ce7d3e5/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 6e60b0e..19b6d26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -23,6 +23,7 @@ import org.mockito.Mockito.mock
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.config
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -446,7 +447,7 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
       val conf = new SparkConf()
         .setMaster("local")
         .setAppName("test")
-        .set("spark.task.maxFailures", "1") // Don't retry the tasks to run this test quickly
+        .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
         .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
       val sc = new SparkContext(conf)
       try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org