You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/09/08 04:42:02 UTC

[spark] branch master updated: [SPARK-32736][CORE] Avoid caching the removed decommissioned executors in TaskSchedulerImpl

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 125cbe3  [SPARK-32736][CORE] Avoid caching the removed decommissioned executors in TaskSchedulerImpl
125cbe3 is described below

commit 125cbe3ae0d664ddc80b5b83cc82a43a0cefb5ca
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Sep 8 04:40:13 2020 +0000

    [SPARK-32736][CORE] Avoid caching the removed decommissioned executors in TaskSchedulerImpl
    
    ### What changes were proposed in this pull request?
    
    The motivation of this PR is to avoid caching the removed decommissioned executors in `TaskSchedulerImpl`. The cache is introduced in https://github.com/apache/spark/pull/29422. The cache will hold the `isHostDecommissioned` info for a while. So if the task `FetchFailure` event comes after the executor loss event, `DAGScheduler` can still get the `isHostDecommissioned` from the cache and unregister the host shuffle map status when the host is decommissioned too.
    
    This PR tries to achieve the same goal without the cache. Instead of saving the `workerLost` in `ExecutorUpdated` / `ExecutorDecommissionInfo` / `ExecutorDecommissionState`, we could save the `hostOpt` directly. When the host is decommissioned or lost too, the `hostOpt` can be a specific host address. Otherwise, it's `None` to indicate that only the executor is decommissioned or lost.
    
    Now that we have the host info, we can also unregister the host shuffle map status when `executorLost` is triggered for the decommissioned executor.
    
    Besides, this PR also includes a few cleanups around the touched code.
    
    ### Why are the changes needed?
    
    It helps to unregister the shuffle map status earlier for both decommission and normal executor lost cases.
    
    It also saves memory in  `TaskSchedulerImpl` and simplifies the code a little bit.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR only refactor the code. The original behaviour should be covered by `DecommissionWorkerSuite`.
    
    Closes #29579 from Ngone51/impr-decom.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   |  2 +-
 .../org/apache/spark/deploy/DeployMessage.scala    |  4 +-
 .../spark/deploy/client/StandaloneAppClient.scala  |  6 +--
 .../client/StandaloneAppClientListener.scala       |  2 +-
 .../org/apache/spark/deploy/master/Master.scala    | 11 ++--
 .../executor/CoarseGrainedExecutorBackend.scala    |  7 +--
 .../org/apache/spark/internal/config/package.scala | 10 ----
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 17 +++---
 .../spark/scheduler/ExecutorDecommissionInfo.scala | 10 ++--
 .../spark/scheduler/ExecutorLossReason.scala       | 10 ++--
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 61 +++-------------------
 .../apache/spark/scheduler/TaskSetManager.scala    |  2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 38 ++++++--------
 .../cluster/StandaloneSchedulerBackend.scala       |  7 ++-
 .../spark/deploy/client/AppClientSuite.scala       |  4 +-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 16 ++++--
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 36 ++++---------
 .../spark/scheduler/TaskSetManagerSuite.scala      |  9 ++--
 .../WorkerDecommissionExtendedSuite.scala          |  2 +-
 .../spark/scheduler/WorkerDecommissionSuite.scala  |  2 +-
 .../BlockManagerDecommissionIntegrationSuite.scala |  2 +-
 .../scheduler/ExecutorAllocationManager.scala      |  2 +-
 .../scheduler/ExecutorAllocationManagerSuite.scala |  2 +-
 23 files changed, 98 insertions(+), 164 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index c298931..b6e14e8 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -580,7 +580,7 @@ private[spark] class ExecutorAllocationManager(
       // when the task backlog decreased.
       if (decommissionEnabled) {
         val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
-          id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray
+          id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
         client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
       } else {
         client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index b7a64d75..83f373d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -187,8 +187,10 @@ private[deploy] object DeployMessages {
     Utils.checkHostPort(hostPort)
   }
 
+  // When the host of Worker is lost or decommissioned, the `workerHost` is the host address
+  // of that Worker. Otherwise, it's None.
   case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
-    exitStatus: Option[Int], workerLost: Boolean)
+    exitStatus: Option[Int], workerHost: Option[String])
 
   case class ApplicationRemoved(message: String)
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index a6da839..e5efb15 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient(
           cores))
         listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 
-      case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
+      case ExecutorUpdated(id, state, message, exitStatus, workerHost) =>
         val fullId = appId + "/" + id
         val messageText = message.map(s => " (" + s + ")").getOrElse("")
         logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
         if (ExecutorState.isFinished(state)) {
-          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
+          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost)
         } else if (state == ExecutorState.DECOMMISSIONED) {
           listener.executorDecommissioned(fullId,
-            ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost))
+            ExecutorDecommissionInfo(message.getOrElse(""), workerHost))
         }
 
       case WorkerRemoved(id, host, message) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
index e72f7e9..76970ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
@@ -39,7 +39,7 @@ private[spark] trait StandaloneAppClientListener {
       fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
 
   def executorRemoved(
-      fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
+      fullId: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit
 
   def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 220e1c9..48516cd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -308,7 +308,7 @@ private[deploy] class Master(
             appInfo.resetRetryCount()
           }
 
-          exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
+          exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None))
 
           if (ExecutorState.isFinished(state)) {
             // Remove this executor from the worker and app
@@ -909,9 +909,10 @@ private[deploy] class Master(
         exec.application.driver.send(ExecutorUpdated(
           exec.id, ExecutorState.DECOMMISSIONED,
           Some("worker decommissioned"), None,
-          // workerLost is being set to true here to let the driver know that the host (aka. worker)
-          // is also being decommissioned.
-          workerLost = true))
+          // worker host is being set here to let the driver know that the host (aka. worker)
+          // is also being decommissioned. So the driver can unregister all the shuffle map
+          // statues located at this host when it receives the executor lost event.
+          Some(worker.host)))
         exec.state = ExecutorState.DECOMMISSIONED
         exec.application.removeExecutor(exec)
       }
@@ -932,7 +933,7 @@ private[deploy] class Master(
     for (exec <- worker.executors.values) {
       logInfo("Telling app of lost executor: " + exec.id)
       exec.application.driver.send(ExecutorUpdated(
-        exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
+        exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host)))
       exec.state = ExecutorState.LOST
       exec.application.removeExecutor(exec)
     }
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 07258f2..48045ba 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -172,10 +172,7 @@ private[spark] class CoarseGrainedExecutorBackend(
           driver match {
             case Some(endpoint) =>
               logInfo("Sending DecommissionExecutor to driver.")
-              endpoint.send(
-                DecommissionExecutor(
-                  executorId,
-                  ExecutorDecommissionInfo(msg, isHostDecommissioned = false)))
+              endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg)))
             case _ =>
               logError("No registered driver to send Decommission to.")
           }
@@ -275,7 +272,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       // Tell master we are are decommissioned so it stops trying to schedule us
       if (driver.nonEmpty) {
         driver.get.askSync[Boolean](DecommissionExecutor(
-            executorId, ExecutorDecommissionInfo(msg, false)))
+          executorId, ExecutorDecommissionInfo(msg)))
       } else {
         logError("No driver to message decommissioning.")
       }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 4b6770b..9a7039a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1889,16 +1889,6 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .createOptional
 
-  private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL =
-    ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL")
-      .doc("Duration for which a decommissioned executor's information will be kept after its" +
-        "removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " +
-        "decommissioning even after the mapper executor has been decommissioned. This allows " +
-        "eager recovery from fetch failures caused by decommissioning, increasing job robustness.")
-      .version("3.1.0")
-      .timeConf(TimeUnit.SECONDS)
-      .createWithDefaultString("5m")
-
   private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
     .doc("Staging directory used while submitting applications.")
     .version("2.0.0")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 18cd241..080e0e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler(
             val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
             val isHostDecommissioned = taskScheduler
               .getExecutorDecommissionState(bmAddress.executorId)
-              .exists(_.isHostDecommissioned)
+              .exists(_.workerHost.isDefined)
 
             // Shuffle output of all executors on host `bmAddress.host` may be lost if:
             // - External shuffle service is enabled, so we assume that all shuffle data on node is
@@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler(
    */
   private[scheduler] def handleExecutorLost(
       execId: String,
-      workerLost: Boolean): Unit = {
+      workerHost: Option[String]): Unit = {
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
+    val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
     removeExecutorAndUnregisterOutputs(
       execId = execId,
       fileLost = fileLost,
-      hostToUnregisterOutputs = None,
+      hostToUnregisterOutputs = workerHost,
       maybeEpoch = None)
   }
 
@@ -2366,11 +2366,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
       dagScheduler.handleExecutorAdded(execId, host)
 
     case ExecutorLost(execId, reason) =>
-      val workerLost = reason match {
-        case ExecutorProcessLost(_, true, _) => true
-        case _ => false
+      val workerHost = reason match {
+        case ExecutorProcessLost(_, workerHost, _) => workerHost
+        case ExecutorDecommission(workerHost) => workerHost
+        case _ => None
       }
-      dagScheduler.handleExecutorLost(execId, workerLost)
+      dagScheduler.handleExecutorLost(execId, workerHost)
 
     case WorkerRemoved(workerId, host, message) =>
       dagScheduler.handleWorkerRemoved(workerId, host, message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
index 48ae879..7eec070 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
@@ -20,12 +20,12 @@ package org.apache.spark.scheduler
 /**
  * Message providing more detail when an executor is being decommissioned.
  * @param message Human readable reason for why the decommissioning is happening.
- * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is
- *                             being decommissioned too. Used to infer if the shuffle data might
- *                             be lost even if the external shuffle service is enabled.
+ * @param workerHost When workerHost is defined, it means the host (aka the `node` or `worker`
+ *                in other places) has been decommissioned too. Used to infer if the
+ *                shuffle data might be lost even if the external shuffle service is enabled.
  */
 private[spark]
-case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean)
+case class ExecutorDecommissionInfo(message: String, workerHost: Option[String] = None)
 
 /**
  * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived
@@ -37,4 +37,4 @@ case class ExecutorDecommissionState(
     // to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL
     // is configured.
     startTime: Long,
-    isHostDecommissioned: Boolean)
+    workerHost: Option[String] = None)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 671deda..f2eb4a7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -53,14 +53,15 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
 
 /**
  * @param _message human readable loss reason
- * @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
+ * @param workerHost it's defined when the host is confirmed lost too (i.e. including
+ *                   shuffle service)
  * @param causedByApp whether the loss of the executor is the fault of the running app.
  *                    (assumed true by default unless known explicitly otherwise)
  */
 private[spark]
 case class ExecutorProcessLost(
     _message: String = "Executor Process Lost",
-    workerLost: Boolean = false,
+    workerHost: Option[String] = None,
     causedByApp: Boolean = true)
   extends ExecutorLossReason(_message)
 
@@ -69,5 +70,8 @@ case class ExecutorProcessLost(
  *
  * This is used by the task scheduler to remove state associated with the executor, but
  * not yet fail any tasks that were running in the executor before the executor is "fully" lost.
+ *
+ * @param workerHost it is defined when the worker is decommissioned too
  */
-private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")
+private [spark] case class ExecutorDecommission(workerHost: Option[String] = None)
+ extends ExecutorLossReason("Executor decommission.")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d446638..107c517 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -143,18 +143,6 @@ private[spark] class TaskSchedulerImpl(
   // continue to run even after being asked to decommission, but they will eventually exit.
   val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionState]
 
-  // When they exit and we know of that via heartbeat failure, we will add them to this cache.
-  // This cache is consulted to know if a fetch failure is because a source executor was
-  // decommissioned.
-  lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder()
-    .expireAfterWrite(
-      conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS)
-    .ticker(new Ticker{
-      override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis())
-    })
-    .build[String, ExecutorDecommissionState]()
-    .asMap()
-
   def runningTasksByExecutors: Map[String, Int] = synchronized {
     executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
   }
@@ -922,28 +910,8 @@ private[spark] class TaskSchedulerImpl(
     synchronized {
       // Don't bother noting decommissioning for executors that we don't know about
       if (executorIdToHost.contains(executorId)) {
-        val oldDecomStateOpt = executorsPendingDecommission.get(executorId)
-        val newDecomState = if (oldDecomStateOpt.isEmpty) {
-          // This is the first time we are hearing of decommissioning this executor,
-          // so create a brand new state.
-          ExecutorDecommissionState(
-            clock.getTimeMillis(),
-            decommissionInfo.isHostDecommissioned)
-        } else {
-          val oldDecomState = oldDecomStateOpt.get
-          if (!oldDecomState.isHostDecommissioned && decommissionInfo.isHostDecommissioned) {
-            // Only the cluster manager is allowed to send decommission messages with
-            // isHostDecommissioned set. So the new decommissionInfo is from the cluster
-            // manager and is thus authoritative. Flip isHostDecommissioned to true but keep the old
-            // decommission start time.
-            ExecutorDecommissionState(
-              oldDecomState.startTime,
-              isHostDecommissioned = true)
-          } else {
-            oldDecomState
-          }
-        }
-        executorsPendingDecommission(executorId) = newDecomState
+        executorsPendingDecommission(executorId) =
+          ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.workerHost)
       }
     }
     rootPool.executorDecommission(executorId)
@@ -952,26 +920,11 @@ private[spark] class TaskSchedulerImpl(
 
   override def getExecutorDecommissionState(executorId: String)
     : Option[ExecutorDecommissionState] = synchronized {
-    executorsPendingDecommission
-      .get(executorId)
-      .orElse(Option(decommissionedExecutorsRemoved.get(executorId)))
+    executorsPendingDecommission.get(executorId)
   }
 
-  override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = {
+  override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
     var failedExecutor: Option[String] = None
-    val reason = givenReason match {
-      // Handle executor process loss due to decommissioning
-      case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) =>
-        val executorDecommissionState = getExecutorDecommissionState(executorId)
-        ExecutorProcessLost(
-          message,
-          // Also mark the worker lost if we know that the host was decommissioned
-          origWorkerLost || executorDecommissionState.exists(_.isHostDecommissioned),
-          // Executor loss is certainly not caused by app if we knew that this executor is being
-          // decommissioned
-          causedByApp = executorDecommissionState.isEmpty && origCausedByApp)
-      case e => e
-    }
 
     synchronized {
       if (executorIdToRunningTaskIds.contains(executorId)) {
@@ -1060,9 +1013,7 @@ private[spark] class TaskSchedulerImpl(
       }
     }
 
-
-    val decomState = executorsPendingDecommission.remove(executorId)
-    decomState.foreach(decommissionedExecutorsRemoved.put(executorId, _))
+    executorsPendingDecommission.remove(executorId)
 
     if (reason != LossReasonPending) {
       executorIdToHost -= executorId
@@ -1104,7 +1055,7 @@ private[spark] class TaskSchedulerImpl(
   // exposed for test
   protected final def isHostDecommissioned(host: String): Boolean = {
     hostToExecutors.get(host).exists { executors =>
-      executors.exists(e => getExecutorDecommissionState(e).exists(_.isHostDecommissioned))
+      executors.exists(e => getExecutorDecommissionState(e).exists(_.workerHost.isDefined))
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index ff03876..673fe4f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -988,7 +988,7 @@ private[spark] class TaskSetManager(
     for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
       val exitCausedByApp: Boolean = reason match {
         case exited: ExecutorExited => exited.exitCausedByApp
-        case ExecutorKilled => false
+        case ExecutorKilled | ExecutorDecommission(_) => false
         case ExecutorProcessLost(_, _, false) => false
         case _ => true
       }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index ca65731..0f14412 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -92,8 +92,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // Executors that have been lost, but for which we don't yet know the real exit reason.
   private val executorsPendingLossReason = new HashSet[String]
 
-  // Executors which are being decommissioned
-  protected val executorsPendingDecommission = new HashSet[String]
+  // Executors which are being decommissioned. Maps from executorId to workerHost.
+  protected val executorsPendingDecommission = new HashMap[String, Option[String]]
 
   // A map of ResourceProfile id to map of hostname with its possible task number running on it
   @GuardedBy("CoarseGrainedSchedulerBackend.this")
@@ -390,16 +390,23 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         case Some(executorInfo) =>
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
-          val killed = CoarseGrainedSchedulerBackend.this.synchronized {
+          val lossReason = CoarseGrainedSchedulerBackend.this.synchronized {
             addressToExecutorId -= executorInfo.executorAddress
             executorDataMap -= executorId
             executorsPendingLossReason -= executorId
-            executorsPendingDecommission -= executorId
-            executorsPendingToRemove.remove(executorId).getOrElse(false)
+            val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false)
+            val workerHostOpt = executorsPendingDecommission.remove(executorId)
+            if (killedByDriver) {
+              ExecutorKilled
+            } else if (workerHostOpt.isDefined) {
+              ExecutorDecommission(workerHostOpt.get)
+            } else {
+              reason
+            }
           }
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           totalRegisteredExecutors.addAndGet(-1)
-          scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
+          scheduler.executorLost(executorId, lossReason)
           listenerBus.post(
             SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
         case None =>
@@ -462,11 +469,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
       adjustTargetNumExecutors: Boolean): Seq[String] = {
 
-    val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) =>
+    val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) =>
       CoarseGrainedSchedulerBackend.this.synchronized {
         // Only bother decommissioning executors which are alive.
         if (isExecutorActive(executorId)) {
-          executorsPendingDecommission += executorId
+          executorsPendingDecommission(executorId) = decomInfo.workerHost
           true
         } else {
           false
@@ -489,19 +496,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       decomInfo: ExecutorDecommissionInfo): Boolean = {
 
     logInfo(s"Asking executor $executorId to decommissioning.")
-    try {
-      scheduler.executorDecommission(executorId, decomInfo)
-      if (driverEndpoint != null) {
-        logInfo("Propagating executor decommission to driver.")
-        driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
-      }
-    } catch {
-      case e: Exception =>
-        logError(s"Unexpected error during decommissioning ${e.toString}", e)
-        return false
-    }
+    scheduler.executorDecommission(executorId, decomInfo)
     // Send decommission message to the executor (it could have originated on the executor
-    // but not necessarily.
+    // but not necessarily).
     CoarseGrainedSchedulerBackend.this.synchronized {
       executorDataMap.get(executorId) match {
         case Some(executorInfo) =>
@@ -656,7 +653,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       !executorsPendingToRemove.contains(id) &&
       !executorsPendingLossReason.contains(id) &&
       !executorsPendingDecommission.contains(id)
-
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 3acb6f1..34b03df 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -165,10 +165,13 @@ private[spark] class StandaloneSchedulerBackend(
   }
 
   override def executorRemoved(
-      fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
+      fullId: String,
+      message: String,
+      exitStatus: Option[Int],
+      workerHost: Option[String]): Unit = {
     val reason: ExecutorLossReason = exitStatus match {
       case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
-      case None => ExecutorProcessLost(message, workerLost = workerLost)
+      case None => ExecutorProcessLost(message, workerHost)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason)
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 85ad4bd..fe88822 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -129,7 +129,7 @@ class AppClientSuite
         // We only record decommissioning for the executor we've requested
         assert(ci.listener.execDecommissionedMap.size === 1)
         val decommissionInfo = ci.listener.execDecommissionedMap.get(executorId)
-        assert(decommissionInfo != null && decommissionInfo.isHostDecommissioned,
+        assert(decommissionInfo != null && decommissionInfo.workerHost.isDefined,
           s"$executorId should have been decommissioned along with its worker")
       }
 
@@ -245,7 +245,7 @@ class AppClientSuite
     }
 
     def executorRemoved(
-        id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
+        id: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit = {
       execRemovedList.add(id)
     }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index a7f8aff..4367658 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -848,9 +848,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
   }
 
   private val shuffleFileLossTests = Seq(
-    ("executor process lost with shuffle service", ExecutorProcessLost("", false), true, false),
-    ("worker lost with shuffle service", ExecutorProcessLost("", true), true, true),
-    ("worker lost without shuffle service", ExecutorProcessLost("", true), false, true),
+    ("executor process lost with shuffle service", ExecutorProcessLost("", None), true, false),
+    ("worker lost with shuffle service", ExecutorProcessLost("", Some("hostA")), true, true),
+    ("worker lost without shuffle service", ExecutorProcessLost("", Some("hostA")), false, true),
     ("executor failure with shuffle service", ExecutorKilled, true, false),
     ("executor failure without shuffle service", ExecutorKilled, false, true))
 
@@ -874,10 +874,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
       submit(reduceRdd, Array(0))
       completeShuffleMapStageSuccessfully(0, 0, 1)
+      val expectHostFileLoss = event match {
+        case ExecutorProcessLost(_, workerHost, _) => workerHost.isDefined
+        case _ => false
+      }
       runEvent(ExecutorLost("hostA-exec", event))
       verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
       if (expectFileLoss) {
-        verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
+        if (expectHostFileLoss) {
+          verify(mapOutputTracker, times(1)).removeOutputsOnHost("hostA")
+        } else {
+          verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
+        }
         intercept[MetadataFetchFailedException] {
           mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
         }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 26c9d91..f29eb70 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -158,8 +158,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
       .exists(s => s.contains(exec0) && s.contains(exec1)))
     assert(scheduler.getExecutorsAliveOnHost(host1).exists(_.contains(exec2)))
 
-    scheduler.executorDecommission(exec1, ExecutorDecommissionInfo("test", false))
-    scheduler.executorDecommission(exec2, ExecutorDecommissionInfo("test", true))
+    scheduler.executorDecommission(exec1, ExecutorDecommissionInfo("test", None))
+    scheduler.executorDecommission(exec2, ExecutorDecommissionInfo("test", Some(host1)))
 
     assert(scheduler.isExecutorAlive(exec0))
     assert(!Seq(exec1, exec2).exists(scheduler.isExecutorAlive))
@@ -1864,18 +1864,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
   test("scheduler should keep the decommission state where host was decommissioned") {
     val clock = new ManualClock(10000L)
     val scheduler = setupSchedulerForDecommissionTests(clock, 2)
-    val oldTime = clock.getTimeMillis()
-    scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", false))
-    scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", true))
-
-    clock.advance(3000L)
-    scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0 new", false))
-    scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1 new", false))
+    val decomTime = clock.getTimeMillis()
+    scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", None))
+    scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", Some("host1")))
 
     assert(scheduler.getExecutorDecommissionState("executor0")
-      === Some(ExecutorDecommissionState(oldTime, false)))
+      === Some(ExecutorDecommissionState(decomTime, None)))
     assert(scheduler.getExecutorDecommissionState("executor1")
-      === Some(ExecutorDecommissionState(oldTime, true)))
+      === Some(ExecutorDecommissionState(decomTime, Some("host1"))))
     assert(scheduler.getExecutorDecommissionState("executor2").isEmpty)
   }
 
@@ -1890,7 +1886,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(scheduler.getExecutorDecommissionState("executor0").isEmpty)
     scheduler.executorLost("executor0", ExecutorExited(0, false, "normal"))
     assert(scheduler.getExecutorDecommissionState("executor0").isEmpty)
-    scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", false))
+    scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", None))
     assert(scheduler.getExecutorDecommissionState("executor0").isEmpty)
 
     // 0th task just died above
@@ -1903,31 +1899,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(scheduler.getExecutorDecommissionState("executor1").isEmpty)
 
     // executor 1 is decommissioned before loosing
-    scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false))
+    scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None))
     assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
     clock.advance(2000)
 
     // executor1 is eventually lost
     scheduler.executorLost("executor1", ExecutorExited(0, false, "normal"))
-    assert(scheduler.decommissionedExecutorsRemoved.size === 1)
     assert(scheduler.executorsPendingDecommission.isEmpty)
     // So now both the tasks are no longer running
     assert(manager.copiesRunning.take(2) === Array(0, 0))
     clock.advance(2000)
 
-    // Decommission state should hang around a bit after removal ...
-    assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
-    scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false))
-    clock.advance(2000)
-    assert(scheduler.decommissionedExecutorsRemoved.size === 1)
-    assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
-
-    // The default timeout for expiry is 300k milliseconds (5 minutes) which completes now,
-    // and the executor1's decommission state should finally be purged.
-    clock.advance(300000)
-    assert(scheduler.getExecutorDecommissionState("executor1").isEmpty)
-    assert(scheduler.decommissionedExecutorsRemoved.isEmpty)
-
     // Now give it some resources and both tasks should be rerun
     val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq(
       WorkerOffer("executor2", "host2", 1), WorkerOffer("executor3", "host3", 1))).flatten
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 86d4e92..c389fd2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -681,8 +681,8 @@ class TaskSetManagerSuite
     assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY))
 
     // Decommission all executors on host0, to mimic CoarseGrainedSchedulerBackend.
-    sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", true))
-    sched.executorDecommission(exec1, ExecutorDecommissionInfo("test", true))
+    sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", Some(host0)))
+    sched.executorDecommission(exec1, ExecutorDecommissionInfo("test", Some(host0)))
 
     assert(manager.myLocalityLevels === Array(ANY))
   }
@@ -707,7 +707,7 @@ class TaskSetManagerSuite
     assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY))
 
     // Decommission the only executor (without the host) that the task is interested in running on.
-    sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", false))
+    sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", None))
 
     assert(manager.myLocalityLevels === Array(NODE_LOCAL, ANY))
   }
@@ -2029,8 +2029,7 @@ class TaskSetManagerSuite
     // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be now
     // checked if they should be speculated.
     // (TASK 2 -> 15, TASK 3 -> 15)
-    sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom",
-      isHostDecommissioned = false))
+    sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom", None))
     assert(sched.getExecutorDecommissionState("exec2").map(_.startTime) ===
       Some(clock.getTimeMillis()))
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
index 4264d45..129eb8b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
@@ -64,7 +64,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte
 
       val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
       sc.getExecutorIds().tail.foreach { id =>
-        sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false),
+        sched.decommissionExecutor(id, ExecutorDecommissionInfo("", None),
           adjustTargetNumExecutors = false)
         assert(rdd3.sortByKey().collect().length === 100)
       }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
index 1ccb53f..83bb66e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
@@ -77,7 +77,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
     val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
     val execs = sched.getExecutorIds()
     // Make the executors decommission, finish, exit, and not be replaced.
-    val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))).toArray
+    val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", None))).toArray
     sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true)
     val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
     assert(asyncCountResult === 10)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 37836a9..094b893 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -192,7 +192,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors
     sched.decommissionExecutor(
       execToDecommission,
-      ExecutorDecommissionInfo("", isHostDecommissioned = false),
+      ExecutorDecommissionInfo("", None),
       adjustTargetNumExecutors = true)
     val decomTime = new SystemClock().getTimeMillis()
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index 8f74d2d..1037950 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -137,7 +137,7 @@ private[streaming] class ExecutorAllocationManager(
         val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
         if (conf.get(DECOMMISSION_ENABLED)) {
           client.decommissionExecutor(execIdToRemove,
-            ExecutorDecommissionInfo("spark scale down", false),
+            ExecutorDecommissionInfo("spark scale down", None),
             adjustTargetNumExecutors = true)
         } else {
           client.killExecutor(execIdToRemove)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index ec3ff45..f187071 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -98,7 +98,7 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
       /** Verify that a particular executor was scaled down. */
       def verifyScaledDownExec(expectedExec: Option[String]): Unit = {
         if (expectedExec.nonEmpty) {
-          val decomInfo = ExecutorDecommissionInfo("spark scale down", false)
+          val decomInfo = ExecutorDecommissionInfo("spark scale down", None)
           if (decommissioning) {
             verify(allocationClient, times(1)).decommissionExecutor(
               meq(expectedExec.get), meq(decomInfo), meq(true))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org