You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/02/14 20:37:58 UTC

[spark] branch master updated: [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d273a2b  [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support
d273a2b is described below

commit d273a2bb0fac452a97f5670edd69d3e452e3e57e
Author: Holden Karau <hk...@apple.com>
AuthorDate: Fri Feb 14 12:36:52 2020 -0800

    [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support
    
    This PR is based on an existing/previou PR - https://github.com/apache/spark/pull/19045
    
    ### What changes were proposed in this pull request?
    
    This changes adds a decommissioning state that we can enter when the cloud provider/scheduler lets us know we aren't going to be removed immediately but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / preemptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs, in the future we could perform some kind of migration of data during scale-down, or at least stop accepting new  [...]
    
    There is a design document at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing
    
    ### Why are the changes needed?
    
    With more move to preemptible multi-tenancy, serverless environments, and spot-instances better handling of node scale down is required.
    
    ### Does this PR introduce any user-facing change?
    
    There is no API change, however an additional configuration flag is added to enable/disable this behaviour.
    
    ### How was this patch tested?
    
    New integration tests in the Spark K8s integration testing. Extension of the AppClientSuite to test decommissioning seperate from the K8s.
    
    Closes #26440 from holdenk/SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4.
    
    Lead-authored-by: Holden Karau <hk...@apple.com>
    Co-authored-by: Holden Karau <ho...@pigscanfly.ca>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../org/apache/spark/deploy/DeployMessage.scala    |  11 ++
 .../org/apache/spark/deploy/ExecutorState.scala    |   8 +-
 .../spark/deploy/client/StandaloneAppClient.scala  |   2 +
 .../client/StandaloneAppClientListener.scala       |   2 +
 .../org/apache/spark/deploy/master/Master.scala    |  31 ++++++
 .../org/apache/spark/deploy/worker/Worker.scala    |  26 +++++
 .../executor/CoarseGrainedExecutorBackend.scala    |  39 ++++++-
 .../scala/org/apache/spark/executor/Executor.scala |  16 +++
 .../org/apache/spark/internal/config/Worker.scala  |   5 +
 core/src/main/scala/org/apache/spark/rdd/RDD.scala |   2 +
 .../spark/scheduler/ExecutorLossReason.scala       |   8 ++
 .../scala/org/apache/spark/scheduler/Pool.scala    |   4 +
 .../org/apache/spark/scheduler/Schedulable.scala   |   1 +
 .../apache/spark/scheduler/SchedulerBackend.scala  |   3 +
 .../org/apache/spark/scheduler/TaskScheduler.scala |   5 +
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |   5 +
 .../apache/spark/scheduler/TaskSetManager.scala    |   6 ++
 .../cluster/CoarseGrainedClusterMessage.scala      |   2 +
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  66 +++++++++++-
 .../cluster/StandaloneSchedulerBackend.scala       |   6 ++
 .../scala/org/apache/spark/util/SignalUtils.scala  |   2 +-
 .../spark/deploy/client/AppClientSuite.scala       |  39 ++++++-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |   2 +
 .../scheduler/ExternalClusterManagerSuite.scala    |   1 +
 .../spark/scheduler/WorkerDecommissionSuite.scala  |  84 +++++++++++++++
 .../apache/spark/deploy/k8s/KubernetesConf.scala   |   3 +
 .../k8s/features/BasicExecutorFeatureStep.scala    |  20 +++-
 .../docker/src/main/dockerfiles/spark/Dockerfile   |   4 +-
 .../docker/src/main/dockerfiles/spark/decom.sh     |  35 ++++++
 .../src/main/dockerfiles/spark/entrypoint.sh       |   6 +-
 .../dev/dev-run-integration-tests.sh               |   9 +-
 .../k8s/integrationtest/DecommissionSuite.scala    |  49 +++++++++
 .../k8s/integrationtest/KubernetesSuite.scala      | 117 ++++++++++++++++-----
 .../integration-tests/tests/decommissioning.py     |  45 ++++++++
 sbin/decommission-slave.sh                         |  57 ++++++++++
 sbin/spark-daemon.sh                               |  15 +++
 36 files changed, 690 insertions(+), 46 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index fba371d..18305ad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -60,6 +60,15 @@ private[deploy] object DeployMessages {
     assert (port > 0)
   }
 
+  /**
+   * @param id the worker id
+   * @param worker the worker endpoint ref
+   */
+  case class WorkerDecommission(
+      id: String,
+      worker: RpcEndpointRef)
+    extends DeployMessage
+
   case class ExecutorStateChanged(
       appId: String,
       execId: Int,
@@ -149,6 +158,8 @@ private[deploy] object DeployMessages {
 
   case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
 
+  case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.
+
   // AppClient to Master
 
   case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index 69c98e2..0751bcf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,9 +19,13 @@ package org.apache.spark.deploy
 
 private[deploy] object ExecutorState extends Enumeration {
 
-  val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
+  val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value
 
   type ExecutorState = Value
 
-  def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
+  // DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from
+  // the worker and the executor still exists - but we do want to avoid scheduling new tasks on it.
+  private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED)
+
+  def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state)
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index 8f17159..eedf5e9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -180,6 +180,8 @@ private[spark] class StandaloneAppClient(
         logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
         if (ExecutorState.isFinished(state)) {
           listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
+        } else if (state == ExecutorState.DECOMMISSIONED) {
+          listener.executorDecommissioned(fullId, message.getOrElse(""))
         }
 
       case WorkerRemoved(id, host, message) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
index d8bc1a8..2e38a68 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
@@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener {
   def executorRemoved(
       fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
 
+  def executorDecommissioned(fullId: String, message: String): Unit
+
   def workerRemoved(workerId: String, host: String, message: String): Unit
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 8d3795c..71df5df 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -243,6 +243,15 @@ private[deploy] class Master(
       logError("Leadership has been revoked -- master shutting down.")
       System.exit(0)
 
+    case WorkerDecommission(id, workerRef) =>
+      logInfo("Recording worker %s decommissioning".format(id))
+      if (state == RecoveryState.STANDBY) {
+        workerRef.send(MasterInStandby)
+      } else {
+        // We use foreach since get gives us an option and we can skip the failures.
+        idToWorker.get(id).foreach(decommissionWorker)
+      }
+
     case RegisterWorker(
       id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
       masterAddress, resources) =>
@@ -313,7 +322,9 @@ private[deploy] class Master(
             // Only retry certain number of times so we don't go into an infinite loop.
             // Important note: this code path is not exercised by tests, so be very careful when
             // changing this `if` condition.
+            // We also don't count failures from decommissioned workers since they are "expected."
             if (!normalExit
+                && oldState != ExecutorState.DECOMMISSIONED
                 && appInfo.incrementRetryCount() >= maxExecutorRetries
                 && maxExecutorRetries >= 0) { // < 0 disables this application-killing path
               val execs = appInfo.executors.values
@@ -850,6 +861,26 @@ private[deploy] class Master(
     true
   }
 
+  private def decommissionWorker(worker: WorkerInfo): Unit = {
+    if (worker.state != WorkerState.DECOMMISSIONED) {
+      logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port))
+      worker.setState(WorkerState.DECOMMISSIONED)
+      for (exec <- worker.executors.values) {
+        logInfo("Telling app of decommission executors")
+        exec.application.driver.send(ExecutorUpdated(
+          exec.id, ExecutorState.DECOMMISSIONED,
+          Some("worker decommissioned"), None, workerLost = false))
+        exec.state = ExecutorState.DECOMMISSIONED
+        exec.application.removeExecutor(exec)
+      }
+      // On recovery do not add a decommissioned executor
+      persistenceEngine.removeWorker(worker)
+    } else {
+      logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned".
+        format(worker.id, worker.host, worker.port))
+    }
+  }
+
   private def removeWorker(worker: WorkerInfo, msg: String): Unit = {
     logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
     worker.setState(WorkerState.DEAD)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4be495a..d988bce 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -67,6 +67,14 @@ private[deploy] class Worker(
   Utils.checkHost(host)
   assert (port > 0)
 
+  // If worker decommissioning is enabled register a handler on PWR to shutdown.
+  if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+    logInfo("Registering SIGPWR handler to trigger decommissioning.")
+    SignalUtils.register("PWR")(decommissionSelf)
+  } else {
+    logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
+  }
+
   // A scheduled executor used to send messages at the specified time.
   private val forwardMessageScheduler =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
@@ -128,6 +136,7 @@ private[deploy] class Worker(
   private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
   private var registered = false
   private var connected = false
+  private var decommissioned = false
   private val workerId = generateWorkerId()
   private val sparkHome =
     if (sys.props.contains(IS_TESTING.key)) {
@@ -549,6 +558,8 @@ private[deploy] class Worker(
     case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
       if (masterUrl != activeMasterUrl) {
         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
+      } else if (decommissioned) {
+        logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
       } else {
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
@@ -672,6 +683,9 @@ private[deploy] class Worker(
     case ApplicationFinished(id) =>
       finishedApps += id
       maybeCleanupApplication(id)
+
+    case DecommissionSelf =>
+      decommissionSelf()
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -771,6 +785,18 @@ private[deploy] class Worker(
     }
   }
 
+  private[deploy] def decommissionSelf(): Boolean = {
+    if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+      logDebug("Decommissioning self")
+      decommissioned = true
+      sendToMaster(WorkerDecommission(workerId, self))
+    } else {
+      logWarning("Asked to decommission self, but decommissioning not enabled")
+    }
+    // Return true since can be called as a signal handler
+    true
+  }
+
   private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
     val driverId = driverStateChanged.driverId
     val exception = driverStateChanged.exception
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 25c5b98..faf03a6 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -43,7 +43,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, ThreadUtils, Utils}
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
 
 private[spark] class CoarseGrainedExecutorBackend(
     override val rpcEnv: RpcEnv,
@@ -64,6 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   private[this] val stopping = new AtomicBoolean(false)
   var executor: Executor = null
+  @volatile private var decommissioned = false
   @volatile var driver: Option[RpcEndpointRef] = None
 
   // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
@@ -80,6 +81,9 @@ private[spark] class CoarseGrainedExecutorBackend(
   private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
 
   override def onStart(): Unit = {
+    logInfo("Registering PWR handler.")
+    SignalUtils.register("PWR")(decommissionSelf)
+
     logInfo("Connecting to driver: " + driverUrl)
     try {
       _resources = parseOrFindResources(resourcesFileOpt)
@@ -160,6 +164,16 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor == null) {
         exitExecutor(1, "Received LaunchTask command but executor was null")
       } else {
+        if (decommissioned) {
+          logError("Asked to launch a task while decommissioned.")
+          driver match {
+            case Some(endpoint) =>
+              logInfo("Sending DecommissionExecutor to driver.")
+              endpoint.send(DecommissionExecutor(executorId))
+            case _ =>
+              logError("No registered driver to send Decommission to.")
+          }
+        }
         val taskDesc = TaskDescription.decode(data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
         taskResources(taskDesc.taskId) = taskDesc.resources
@@ -242,6 +256,29 @@ private[spark] class CoarseGrainedExecutorBackend(
 
     System.exit(code)
   }
+
+  private def decommissionSelf(): Boolean = {
+    logInfo("Decommissioning self w/sync")
+    try {
+      decommissioned = true
+      // Tell master we are are decommissioned so it stops trying to schedule us
+      if (driver.nonEmpty) {
+        driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+      } else {
+        logError("No driver to message decommissioning.")
+      }
+      if (executor != null) {
+        executor.decommission()
+      }
+      logInfo("Done decommissioning self.")
+      // Return true since we are handling a signal
+      true
+    } catch {
+      case e: Exception =>
+        logError(s"Error ${e} during attempt to decommission self")
+        false
+    }
+  }
 }
 
 private[spark] object CoarseGrainedExecutorBackend extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 8aeb16f..2bfa1ce 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -216,16 +216,32 @@ private[spark] class Executor(
    */
   private var heartbeatFailures = 0
 
+  /**
+   * Flag to prevent launching new tasks while decommissioned. There could be a race condition
+   * accessing this, but decommissioning is only intended to help not be a hard stop.
+   */
+  private var decommissioned = false
+
   heartbeater.start()
 
   metricsPoller.start()
 
   private[executor] def numRunningTasks: Int = runningTasks.size()
 
+  /**
+   * Mark an executor for decommissioning and avoid launching new tasks.
+   */
+  private[spark] def decommission(): Unit = {
+    decommissioned = true
+  }
+
   def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
     val tr = new TaskRunner(context, taskDescription)
     runningTasks.put(taskDescription.taskId, tr)
     threadPool.execute(tr)
+    if (decommissioned) {
+      log.error(s"Launching a task while in decommissioned state.")
+    }
   }
 
   def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index f1eaae2..2b175c1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -71,4 +71,9 @@ private[spark] object Worker {
     ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
     .intConf
     .createWithDefault(100)
+
+  private[spark] val WORKER_DECOMMISSION_ENABLED =
+    ConfigBuilder("spark.worker.decommission.enabled")
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 64d2032..a26b579 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -361,6 +361,7 @@ abstract class RDD[T: ClassTag](
       readCachedBlock = false
       computeOrReadCheckpoint(partition, context)
     }) match {
+      // Block hit.
       case Left(blockResult) =>
         if (readCachedBlock) {
           val existingMetrics = context.taskMetrics().inputMetrics
@@ -374,6 +375,7 @@ abstract class RDD[T: ClassTag](
         } else {
           new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
         }
+      // Need to compute the block.
       case Right(iter) =>
         new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 46a35b6..ee31093 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -58,3 +58,11 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
 private[spark]
 case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
   extends ExecutorLossReason(_message)
+
+/**
+ * A loss reason that means the executor is marked for decommissioning.
+ *
+ * This is used by the task scheduler to remove state associated with the executor, but
+ * not yet fail any tasks that were running in the executor before the executor is "fully" lost.
+ */
+private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 80805df..2e2851e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -88,6 +88,10 @@ private[spark] class Pool(
     schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
   }
 
+  override def executorDecommission(executorId: String): Unit = {
+    schedulableQueue.asScala.foreach(_.executorDecommission(executorId))
+  }
+
   override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
     var shouldRevive = false
     for (schedulable <- schedulableQueue.asScala) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index b6f88ed..8cc239c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -43,6 +43,7 @@ private[spark] trait Schedulable {
   def removeSchedulable(schedulable: Schedulable): Unit
   def getSchedulableByName(name: String): Schedulable
   def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
+  def executorDecommission(executorId: String): Unit
   def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
   def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 9159d2a..4752353 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -27,6 +27,9 @@ private[spark] trait SchedulerBackend {
 
   def start(): Unit
   def stop(): Unit
+  /**
+   * Update the current offers and schedule tasks
+   */
   def reviveOffers(): Unit
   def defaultParallelism(): Int
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 15f5d20..e9e638a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -99,6 +99,11 @@ private[spark] trait TaskScheduler {
   def applicationId(): String = appId
 
   /**
+   * Process a decommissioning executor.
+   */
+  def executorDecommission(executorId: String): Unit
+
+  /**
    * Process a lost executor
    */
   def executorLost(executorId: String, reason: ExecutorLossReason): Unit
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bf92081..1b197c4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -734,6 +734,11 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
+  override def executorDecommission(executorId: String): Unit = {
+    rootPool.executorDecommission(executorId)
+    backend.reviveOffers()
+  }
+
   override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
     var failedExecutor: Option[String] = None
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2ce1134..18684ee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1083,6 +1083,12 @@ private[spark] class TaskSetManager(
     levels.toArray
   }
 
+  def executorDecommission(execId: String): Unit = {
+    recomputeLocality()
+    // Future consideration: if an executor is decommissioned it may make sense to add the current
+    // tasks to the spec exec queue.
+  }
+
   def recomputeLocality(): Unit = {
     // A zombie TaskSetManager may reach here while executorLost happens
     if (isZombie) return
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 2833908..8db0122 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -94,6 +94,8 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
     extends CoarseGrainedClusterMessage
 
+  case class DecommissionExecutor(executorId: String)  extends CoarseGrainedClusterMessage
+
   case class RemoveWorker(workerId: String, host: String, message: String)
     extends CoarseGrainedClusterMessage
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 63aa049..6e1efda 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -92,6 +92,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // Executors that have been lost, but for which we don't yet know the real exit reason.
   private val executorsPendingLossReason = new HashSet[String]
 
+  // Executors which are being decommissioned
+  protected val executorsPendingDecommission = new HashSet[String]
+
   // A map of ResourceProfile id to map of hostname with its possible task number running on it
   @GuardedBy("CoarseGrainedSchedulerBackend.this")
   protected var rpHostToLocalTaskCount: Map[Int, Map[String, Int]] = Map.empty
@@ -185,11 +188,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
         removeExecutor(executorId, reason)
 
+      case DecommissionExecutor(executorId) =>
+        logError(s"Received decommission executor message ${executorId}.")
+        decommissionExecutor(executorId)
+
+      case RemoveWorker(workerId, host, message) =>
+        removeWorker(workerId, host, message)
+
       case LaunchedExecutor(executorId) =>
         executorDataMap.get(executorId).foreach { data =>
           data.freeCores = data.totalCores
         }
         makeOffers(executorId)
+      case e =>
+        logError(s"Received unexpected message. ${e}")
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -257,6 +269,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         removeWorker(workerId, host, message)
         context.reply(true)
 
+      case DecommissionExecutor(executorId) =>
+        logError(s"Received decommission executor message ${executorId}.")
+        decommissionExecutor(executorId)
+        context.reply(true)
+
       case RetrieveSparkAppConfig(resourceProfileId) =>
         val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
         val reply = SparkAppConfig(
@@ -265,6 +282,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           Option(delegationTokens.get()),
           rp)
         context.reply(reply)
+      case e =>
+        logError(s"Received unexpected ask ${e}")
     }
 
     // Make fake resource offers on all executors
@@ -365,6 +384,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             addressToExecutorId -= executorInfo.executorAddress
             executorDataMap -= executorId
             executorsPendingLossReason -= executorId
+            executorsPendingDecommission -= executorId
             executorsPendingToRemove.remove(executorId).getOrElse(false)
           }
           totalCoreCount.addAndGet(-executorInfo.totalCores)
@@ -390,6 +410,35 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     /**
+     * Mark a given executor as decommissioned and stop making resource offers for it.
+     */
+    private def decommissionExecutor(executorId: String): Boolean = {
+      val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+
+      if (shouldDisable) {
+        logInfo(s"Starting decommissioning executor $executorId.")
+        try {
+          scheduler.executorDecommission(executorId)
+        } catch {
+          case e: Exception =>
+            logError(s"Unexpected error during decommissioning ${e.toString}", e)
+        }
+        logInfo(s"Finished decommissioning executor $executorId.")
+      } else {
+        logInfo(s"Skipping decommissioning of executor $executorId.")
+      }
+      shouldDisable
+    }
+
+    /**
      * Stop making resource offers for the given executor. The executor is marked as lost with
      * the loss reason still pending.
      *
@@ -511,8 +560,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   protected def removeWorker(workerId: String, host: String, message: String): Unit = {
-    driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t =>
-      logError(t.getMessage, t))(ThreadUtils.sameThread)
+    driverEndpoint.send(RemoveWorker(workerId, host, message))
+  }
+
+  /**
+   * Called by subclasses when notified of a decommissioning executor.
+   */
+  private[spark] def decommissionExecutor(executorId: String): Unit = {
+    if (driverEndpoint != null) {
+      logInfo("Propegating executor decommission to driver.")
+      driverEndpoint.send(DecommissionExecutor(executorId))
+    }
   }
 
   def sufficientResourcesRegistered(): Boolean = true
@@ -543,7 +601,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   override def isExecutorActive(id: String): Boolean = synchronized {
     executorDataMap.contains(id) &&
       !executorsPendingToRemove.contains(id) &&
-      !executorsPendingLossReason.contains(id)
+      !executorsPendingLossReason.contains(id) &&
+      !executorsPendingDecommission.contains(id)
+
   }
 
   override def maxNumConcurrentTasks(): Int = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index d91d78b..42c4646 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -174,6 +174,12 @@ private[spark] class StandaloneSchedulerBackend(
     removeExecutor(fullId.split("/")(1), reason)
   }
 
+  override def executorDecommissioned(fullId: String, message: String) {
+    logInfo("Asked to decommission executor")
+    decommissionExecutor(fullId.split("/")(1))
+    logInfo("Executor %s decommissioned: %s".format(fullId, message))
+  }
+
   override def workerRemoved(workerId: String, host: String, message: String): Unit = {
     logInfo("Worker %s removed: %s".format(workerId, message))
     removeWorker(workerId, host, message)
diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
index 5a24965..230195d 100644
--- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
@@ -60,7 +60,7 @@ private[spark] object SignalUtils extends Logging {
     if (SystemUtils.IS_OS_UNIX) {
       try {
         val handler = handlers.getOrElseUpdate(signal, {
-          logInfo("Registered signal handler for " + signal)
+          logInfo("Registering signal handler for " + signal)
           new ActionHandler(new Signal(signal))
         })
         handler.register(action)
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index a1d3077..a3e39d7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
 import org.apache.spark.deploy.master.{ApplicationInfo, Master}
 import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.Utils
 
@@ -44,13 +44,13 @@ class AppClientSuite
     with Eventually
     with ScalaFutures {
   private val numWorkers = 2
-  private val conf = new SparkConf()
-  private val securityManager = new SecurityManager(conf)
 
+  private var conf: SparkConf = null
   private var masterRpcEnv: RpcEnv = null
   private var workerRpcEnvs: Seq[RpcEnv] = null
   private var master: Master = null
   private var workers: Seq[Worker] = null
+  private var securityManager: SecurityManager = null
 
   /**
    * Start the local cluster.
@@ -58,6 +58,8 @@ class AppClientSuite
    */
   override def beforeAll(): Unit = {
     super.beforeAll()
+    conf = new SparkConf().set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true")
+    securityManager = new SecurityManager(conf)
     masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
     workerRpcEnvs = (0 until numWorkers).map { i =>
       RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
@@ -111,8 +113,23 @@ class AppClientSuite
         assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed")
       }
 
+
+      // Save the executor id before decommissioning so we can kill it
+      val application = getApplications().head
+      val executors = application.executors
+      val executorId: String = executors.head._2.fullId
+
+      // Send a decommission self to all the workers
+      // Note: normally the worker would send this on their own.
+      workers.foreach(worker => worker.decommissionSelf())
+
+      // Decommissioning is async.
+      eventually(timeout(1.seconds), interval(10.millis)) {
+        // We only record decommissioning for the executor we've requested
+        assert(ci.listener.execDecommissionedList.size === 1)
+      }
+
       // Send request to kill executor, verify request was made
-      val executorId: String = getApplications().head.executors.head._2.fullId
       whenReady(
         ci.client.killExecutors(Seq(executorId)),
         timeout(10.seconds),
@@ -120,6 +137,15 @@ class AppClientSuite
         assert(acknowledged)
       }
 
+      // Verify that asking for executors on the decommissioned workers fails
+      whenReady(
+        ci.client.requestTotalExecutors(numExecutorsRequested),
+        timeout(10.seconds),
+        interval(10.millis)) { acknowledged =>
+        assert(acknowledged)
+      }
+      assert(getApplications().head.executors.size === 0)
+
       // Issue stop command for Client to disconnect from Master
       ci.client.stop()
 
@@ -189,6 +215,7 @@ class AppClientSuite
     val deadReasonList = new ConcurrentLinkedQueue[String]()
     val execAddedList = new ConcurrentLinkedQueue[String]()
     val execRemovedList = new ConcurrentLinkedQueue[String]()
+    val execDecommissionedList = new ConcurrentLinkedQueue[String]()
 
     def connected(id: String): Unit = {
       connectedIdList.add(id)
@@ -218,6 +245,10 @@ class AppClientSuite
       execRemovedList.add(id)
     }
 
+    def executorDecommissioned(id: String, message: String): Unit = {
+      execDecommissionedList.add(id)
+    }
+
     def workerRemoved(workerId: String, host: String, message: String): Unit = {}
   }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 101e60c..e40b63f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -167,6 +167,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     }
     override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
     override def defaultParallelism() = 2
+    override def executorDecommission(executorId: String) = {}
     override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
     override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
     override def applicationAttemptId(): Option[String] = None
@@ -707,6 +708,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
           accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
           blockManagerId: BlockManagerId,
           executorUpdates: Map[(Int, Int), ExecutorMetrics]): Boolean = true
+      override def executorDecommission(executorId: String): Unit = {}
       override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
       override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
       override def applicationAttemptId(): Option[String] = None
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 4e71ec1..9f593e0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -89,6 +89,7 @@ private class DummyTaskScheduler extends TaskScheduler {
   override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {}
   override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
   override def defaultParallelism(): Int = 2
+  override def executorDecommission(executorId: String): Unit = {}
   override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
   override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
   override def applicationAttemptId(): Option[String] = None
diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
new file mode 100644
index 0000000..15733b0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.Semaphore
+
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration._
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils}
+
+class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
+
+  override def beforeEach(): Unit = {
+    val conf = new SparkConf().setAppName("test").setMaster("local")
+      .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
+
+    sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
+  }
+
+  test("verify task with no decommissioning works as expected") {
+    val input = sc.parallelize(1 to 10)
+    input.count()
+    val sleepyRdd = input.mapPartitions{ x =>
+      Thread.sleep(100)
+      x
+    }
+    assert(sleepyRdd.count() === 10)
+  }
+
+  test("verify a task with all workers decommissioned succeeds") {
+    val input = sc.parallelize(1 to 10)
+    // Do a count to wait for the executors to be registered.
+    input.count()
+    val sleepyRdd = input.mapPartitions{ x =>
+      Thread.sleep(50)
+      x
+    }
+    // Listen for the job
+    val sem = new Semaphore(0)
+    sc.addSparkListener(new SparkListener {
+      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+        sem.release()
+      }
+    })
+    // Start the task.
+    val asyncCount = sleepyRdd.countAsync()
+    // Wait for the job to have started
+    sem.acquire(1)
+    // Decommission all the executors, this should not halt the current task.
+    // decom.sh message passing is tested manually.
+    val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
+    val execs = sched.getExecutorIds()
+    execs.foreach(execId => sched.decommissionExecutor(execId))
+    val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 2.seconds)
+    assert(asyncCountResult === 10)
+    // Try and launch task after decommissioning, this should fail
+    val postDecommissioned = input.map(x => x)
+    val postDecomAsyncCount = postDecommissioned.countAsync()
+    val thrown = intercept[java.util.concurrent.TimeoutException]{
+      val result = ThreadUtils.awaitResult(postDecomAsyncCount, 2.seconds)
+    }
+    assert(postDecomAsyncCount.isCompleted === false,
+      "After exec decommission new task could not launch")
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 09943b7..f42f341 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -55,6 +55,9 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
       }
   }
 
+  def workerDecommissioning: Boolean =
+    sparkConf.get(org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED)
+
   def nodeSelector: Map[String, String] =
     KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 6a26df2..f575241 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.rpc.RpcEndpointAddress
@@ -33,7 +34,7 @@ import org.apache.spark.util.Utils
 private[spark] class BasicExecutorFeatureStep(
     kubernetesConf: KubernetesExecutorConf,
     secMgr: SecurityManager)
-  extends KubernetesFeatureConfigStep {
+  extends KubernetesFeatureConfigStep with Logging {
 
   // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
   private val executorContainerImage = kubernetesConf
@@ -186,6 +187,21 @@ private[spark] class BasicExecutorFeatureStep(
           .endResources()
         .build()
     }.getOrElse(executorContainer)
+    val containerWithLifecycle =
+      if (!kubernetesConf.workerDecommissioning) {
+        logInfo("Decommissioning not enabled, skipping shutdown script")
+        containerWithLimitCores
+      } else {
+        logInfo("Adding decommission script to lifecycle")
+        new ContainerBuilder(containerWithLimitCores).withNewLifecycle()
+          .withNewPreStop()
+            .withNewExec()
+              .addToCommand("/opt/decom.sh")
+            .endExec()
+          .endPreStop()
+          .endLifecycle()
+          .build()
+      }
     val ownerReference = kubernetesConf.driverPod.map { pod =>
       new OwnerReferenceBuilder()
         .withController(true)
@@ -213,6 +229,6 @@ private[spark] class BasicExecutorFeatureStep(
     kubernetesConf.get(KUBERNETES_EXECUTOR_SCHEDULER_NAME)
       .foreach(executorPod.getSpec.setSchedulerName)
 
-    SparkPod(executorPod, containerWithLimitCores)
+    SparkPod(executorPod, containerWithLifecycle)
   }
 }
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
index 6ed37fc..cc65a7d 100644
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
@@ -30,7 +30,7 @@ ARG spark_uid=185
 RUN set -ex && \
     apt-get update && \
     ln -s /lib /lib64 && \
-    apt install -y bash tini libc6 libpam-modules krb5-user libnss3 && \
+    apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps && \
     mkdir -p /opt/spark && \
     mkdir -p /opt/spark/examples && \
     mkdir -p /opt/spark/work-dir && \
@@ -45,6 +45,7 @@ COPY jars /opt/spark/jars
 COPY bin /opt/spark/bin
 COPY sbin /opt/spark/sbin
 COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
+COPY kubernetes/dockerfiles/spark/decom.sh /opt/
 COPY examples /opt/spark/examples
 COPY kubernetes/tests /opt/spark/tests
 COPY data /opt/spark/data
@@ -53,6 +54,7 @@ ENV SPARK_HOME /opt/spark
 
 WORKDIR /opt/spark/work-dir
 RUN chmod g+w /opt/spark/work-dir
+RUN chmod a+x /opt/decom.sh
 
 ENTRYPOINT [ "/opt/entrypoint.sh" ]
 
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
new file mode 100755
index 0000000..8a5208d
--- /dev/null
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+set -ex
+echo "Asked to decommission"
+# Find the pid to signal
+date | tee -a ${LOG}
+WORKER_PID=$(ps -o pid -C java | tail -n 1| awk '{ sub(/^[ \t]+/, ""); print }')
+echo "Using worker pid $WORKER_PID"
+kill -s SIGPWR ${WORKER_PID}
+# For now we expect this to timeout, since we don't start exiting the backend.
+echo "Waiting for worker pid to exit"
+# If the worker does exit stop blocking the cleanup.
+timeout 60 tail --pid=${WORKER_PID} -f /dev/null
+date
+echo "Done"
+date
+sleep 30
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 6ee3523..05ab782 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -30,9 +30,9 @@ set -e
 # If there is no passwd entry for the container UID, attempt to create one
 if [ -z "$uidentry" ] ; then
     if [ -w /etc/passwd ] ; then
-        echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd
+	echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd
     else
-        echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
+	echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
     fi
 fi
 
@@ -59,7 +59,7 @@ fi
 # If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
 # It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s.
 if [ -n ${HADOOP_HOME}  ] && [ -z ${SPARK_DIST_CLASSPATH}  ]; then
-  export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)  
+  export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
 fi
 
 if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
index 607bb24..292abe9 100755
--- a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
+++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
@@ -16,7 +16,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-set -xo errexit
+set -exo errexit
 TEST_ROOT_DIR=$(git rev-parse --show-toplevel)
 
 DEPLOY_MODE="minikube"
@@ -42,6 +42,9 @@ SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version 2>/dev/nu
     | grep -v "WARNING"\
     | tail -n 1)
 
+export SCALA_VERSION
+echo $SCALA_VERSION
+
 # Parse arguments
 while (( "$#" )); do
   case $1 in
@@ -110,7 +113,8 @@ while (( "$#" )); do
       shift
       ;;
     *)
-      break
+      echo "Unexpected command line flag $2 $1."
+      exit 1
       ;;
   esac
   shift
@@ -164,6 +168,7 @@ properties+=(
   -Dspark.kubernetes.test.jvmImage=$JVM_IMAGE_NAME
   -Dspark.kubernetes.test.pythonImage=$PYTHON_IMAGE_NAME
   -Dspark.kubernetes.test.rImage=$R_IMAGE_NAME
+  -Dlog4j.logger.org.apache.spark=DEBUG
 )
 
 $TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pscala-$SCALA_VERSION -Pkubernetes -Pkubernetes-integration-tests ${properties[@]}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
new file mode 100644
index 0000000..f5eab6e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import org.apache.spark.internal.config.Worker
+
+private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
+
+  import DecommissionSuite._
+  import KubernetesSuite.k8sTestTag
+
+  test("Test basic decommissioning", k8sTestTag) {
+    sparkAppConf
+      .set(Worker.WORKER_DECOMMISSION_ENABLED.key, "true")
+      .set("spark.kubernetes.pyspark.pythonVersion", "3")
+      .set("spark.kubernetes.container.image", pyImage)
+
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PYSPARK_DECOMISSIONING,
+      mainClass = "",
+      expectedLogOnCompletion = Seq("decommissioning executor",
+        "Finished waiting, stopping Spark"),
+      appArgs = Array.empty[String],
+      driverPodChecker = doBasicDriverPyPodCheck,
+      executorPodChecker = doBasicExecutorPyPodCheck,
+      appLocator = appLocator,
+      isJVM = false,
+      decommissioningTest = true)
+  }
+}
+
+private[spark] object DecommissionSuite {
+  val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/"
+  val PYSPARK_DECOMISSIONING: String = TEST_LOCAL_PYSPARK + "decommissioning.py"
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 0d4fccc..61e1f27 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -42,7 +42,9 @@ import org.apache.spark.internal.config._
 class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
   with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite
-  with DepsTestsSuite with RTestsSuite with Logging with Eventually with Matchers {
+  with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually
+  with Matchers {
+
 
   import KubernetesSuite._
 
@@ -254,6 +256,7 @@ class KubernetesSuite extends SparkFunSuite
     }
   }
 
+  // scalastyle:off argcount
   protected def runSparkApplicationAndVerifyCompletion(
       appResource: String,
       mainClass: String,
@@ -264,60 +267,120 @@ class KubernetesSuite extends SparkFunSuite
       appLocator: String,
       isJVM: Boolean,
       pyFiles: Option[String] = None,
-      executorPatience: Option[(Option[Interval], Option[Timeout])] = None): Unit = {
+      executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
+      decommissioningTest: Boolean = false): Unit = {
+
+  // scalastyle:on argcount
     val appArguments = SparkAppArguments(
       mainAppResource = appResource,
       mainClass = mainClass,
       appArgs = appArgs)
-    SparkAppLauncher.launch(
-      appArguments,
-      sparkAppConf,
-      TIMEOUT.value.toSeconds.toInt,
-      sparkHomeDir,
-      isJVM,
-      pyFiles)
 
-    val driverPod = kubernetesTestComponents.kubernetesClient
-      .pods()
-      .withLabel("spark-app-locator", appLocator)
-      .withLabel("spark-role", "driver")
-      .list()
-      .getItems
-      .get(0)
-    driverPodChecker(driverPod)
     val execPods = scala.collection.mutable.Map[String, Pod]()
+    val (patienceInterval, patienceTimeout) = {
+      executorPatience match {
+        case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT))
+        case _ => (INTERVAL, TIMEOUT)
+      }
+    }
+    def checkPodReady(namespace: String, name: String) = {
+      val execPod = kubernetesTestComponents.kubernetesClient
+        .pods()
+        .inNamespace(namespace)
+        .withName(name)
+        .get()
+      val resourceStatus = execPod.getStatus
+      val conditions = resourceStatus.getConditions().asScala
+      val conditionTypes = conditions.map(_.getType())
+      val readyConditions = conditions.filter{cond => cond.getType() == "Ready"}
+      val result = readyConditions
+        .map(cond => cond.getStatus() == "True")
+        .headOption.getOrElse(false)
+      result
+    }
     val execWatcher = kubernetesTestComponents.kubernetesClient
       .pods()
       .withLabel("spark-app-locator", appLocator)
       .withLabel("spark-role", "executor")
       .watch(new Watcher[Pod] {
-        logInfo("Beginning watch of executors")
+        logDebug("Beginning watch of executors")
         override def onClose(cause: KubernetesClientException): Unit =
           logInfo("Ending watch of executors")
         override def eventReceived(action: Watcher.Action, resource: Pod): Unit = {
           val name = resource.getMetadata.getName
+          val namespace = resource.getMetadata().getNamespace()
           action match {
-            case Action.ADDED | Action.MODIFIED =>
+            case Action.MODIFIED =>
+              execPods(name) = resource
+            case Action.ADDED =>
+              logDebug(s"Add event received for $name.")
               execPods(name) = resource
+              // If testing decommissioning start a thread to simulate
+              // decommissioning.
+              if (decommissioningTest && execPods.size == 1) {
+                // Wait for all the containers in the pod to be running
+                logDebug("Waiting for first pod to become OK prior to deletion")
+                Eventually.eventually(patienceTimeout, patienceInterval) {
+                  val result = checkPodReady(namespace, name)
+                  result shouldBe (true)
+                }
+                // Sleep a small interval to allow execution of job
+                logDebug("Sleeping before killing pod.")
+                Thread.sleep(2000)
+                // Delete the pod to simulate cluster scale down/migration.
+                val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name)
+                pod.delete()
+                logDebug(s"Triggered pod decom/delete: $name deleted")
+              }
             case Action.DELETED | Action.ERROR =>
               execPods.remove(name)
           }
         }
       })
 
-    val (patienceInterval, patienceTimeout) = {
-      executorPatience match {
-        case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT))
-        case _ => (INTERVAL, TIMEOUT)
-      }
-    }
+    logDebug("Starting Spark K8s job")
+    SparkAppLauncher.launch(
+      appArguments,
+      sparkAppConf,
+      TIMEOUT.value.toSeconds.toInt,
+      sparkHomeDir,
+      isJVM,
+      pyFiles)
 
+    val driverPod = kubernetesTestComponents.kubernetesClient
+      .pods()
+      .withLabel("spark-app-locator", appLocator)
+      .withLabel("spark-role", "driver")
+      .list()
+      .getItems
+      .get(0)
+
+    driverPodChecker(driverPod)
+    // If we're testing decommissioning we delete all the executors, but we should have
+    // an executor at some point.
     Eventually.eventually(patienceTimeout, patienceInterval) {
       execPods.values.nonEmpty should be (true)
     }
+    // If decommissioning we need to wait and check the executors were removed
+    if (decommissioningTest) {
+      // Sleep a small interval to ensure everything is registered.
+      Thread.sleep(100)
+      // Wait for the executors to become ready
+      Eventually.eventually(patienceTimeout, patienceInterval) {
+        val anyReadyPods = ! execPods.map{
+          case (name, resource) =>
+            (name, resource.getMetadata().getNamespace())
+        }.filter{
+          case (name, namespace) => checkPodReady(namespace, name)
+        }.isEmpty
+        val podsEmpty = execPods.values.isEmpty
+        val podsReadyOrDead = anyReadyPods || podsEmpty
+        podsReadyOrDead shouldBe (true)
+      }
+    }
     execWatcher.close()
     execPods.values.foreach(executorPodChecker(_))
-    Eventually.eventually(TIMEOUT, patienceInterval) {
+    Eventually.eventually(patienceTimeout, patienceInterval) {
       expectedLogOnCompletion.foreach { e =>
         assert(kubernetesTestComponents.kubernetesClient
           .pods()
@@ -425,5 +488,5 @@ private[spark] object KubernetesSuite {
   val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
   val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
   val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
-  val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
+  val INTERVAL = PatienceConfiguration.Interval(Span(1, Seconds))
 }
diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
new file mode 100644
index 0000000..f68f24d
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+import time
+
+from pyspark.sql import SparkSession
+
+
+if __name__ == "__main__":
+    """
+        Usage: decommissioning
+    """
+    print("Starting decom test")
+    spark = SparkSession \
+        .builder \
+        .appName("PyMemoryTest") \
+        .getOrCreate()
+    sc = spark._sc
+    rdd = sc.parallelize(range(10))
+    rdd.collect()
+    print("Waiting to give nodes time to finish.")
+    time.sleep(5)
+    rdd.collect()
+    print("Waiting some more....")
+    time.sleep(10)
+    rdd.collect()
+    print("Finished waiting, stopping Spark.")
+    spark.stop()
+    print("Done, exiting Python")
+    sys.exit(0)
diff --git a/sbin/decommission-slave.sh b/sbin/decommission-slave.sh
new file mode 100644
index 0000000..4bbf257
--- /dev/null
+++ b/sbin/decommission-slave.sh
@@ -0,0 +1,57 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A shell script to decommission all workers on a single slave
+#
+# Environment variables
+#
+#   SPARK_WORKER_INSTANCES The number of worker instances that should be
+#                          running on this slave.  Default is 1.
+
+# Usage: decommission-slave.sh [--block-until-exit]
+#   Decommissions all slaves on this worker machine
+
+set -ex
+
+if [ -z "${SPARK_HOME}" ]; then
+  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+fi
+
+. "${SPARK_HOME}/sbin/spark-config.sh"
+
+. "${SPARK_HOME}/bin/load-spark-env.sh"
+
+if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
+  "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker 1
+else
+  for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
+    "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker $(( $i + 1 ))
+  done
+fi
+
+# Check if --block-until-exit is set.
+# This is done for systems which block on the decomissioning script and on exit
+# shut down the entire system (e.g. K8s).
+if [ "$1" == "--block-until-exit" ]; then
+  shift
+  # For now we only block on the 0th instance if there multiple instances.
+  instance=$1
+  pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"
+  wait $pid
+fi
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index 6de67e0..81f2fd4 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -215,6 +215,21 @@ case $option in
     fi
     ;;
 
+  (decommission)
+
+    if [ -f $pid ]; then
+      TARGET_ID="$(cat "$pid")"
+      if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
+        echo "decommissioning $command"
+        kill -s SIGPWR "$TARGET_ID"
+      else
+        echo "no $command to decommission"
+      fi
+    else
+      echo "no $command to decommission"
+    fi
+    ;;
+
   (status)
 
     if [ -f $pid ]; then


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org