You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/02/14 20:37:58 UTC
[spark] branch master updated: [SPARK-20628][CORE][K8S] Start to
improve Spark decommissioning & preemption support
This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d273a2b [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support
d273a2b is described below
commit d273a2bb0fac452a97f5670edd69d3e452e3e57e
Author: Holden Karau <hk...@apple.com>
AuthorDate: Fri Feb 14 12:36:52 2020 -0800
[SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support
This PR is based on an existing/previou PR - https://github.com/apache/spark/pull/19045
### What changes were proposed in this pull request?
This changes adds a decommissioning state that we can enter when the cloud provider/scheduler lets us know we aren't going to be removed immediately but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / preemptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs, in the future we could perform some kind of migration of data during scale-down, or at least stop accepting new [...]
There is a design document at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing
### Why are the changes needed?
With more move to preemptible multi-tenancy, serverless environments, and spot-instances better handling of node scale down is required.
### Does this PR introduce any user-facing change?
There is no API change, however an additional configuration flag is added to enable/disable this behaviour.
### How was this patch tested?
New integration tests in the Spark K8s integration testing. Extension of the AppClientSuite to test decommissioning seperate from the K8s.
Closes #26440 from holdenk/SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4.
Lead-authored-by: Holden Karau <hk...@apple.com>
Co-authored-by: Holden Karau <ho...@pigscanfly.ca>
Signed-off-by: Holden Karau <hk...@apple.com>
---
.../org/apache/spark/deploy/DeployMessage.scala | 11 ++
.../org/apache/spark/deploy/ExecutorState.scala | 8 +-
.../spark/deploy/client/StandaloneAppClient.scala | 2 +
.../client/StandaloneAppClientListener.scala | 2 +
.../org/apache/spark/deploy/master/Master.scala | 31 ++++++
.../org/apache/spark/deploy/worker/Worker.scala | 26 +++++
.../executor/CoarseGrainedExecutorBackend.scala | 39 ++++++-
.../scala/org/apache/spark/executor/Executor.scala | 16 +++
.../org/apache/spark/internal/config/Worker.scala | 5 +
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +
.../spark/scheduler/ExecutorLossReason.scala | 8 ++
.../scala/org/apache/spark/scheduler/Pool.scala | 4 +
.../org/apache/spark/scheduler/Schedulable.scala | 1 +
.../apache/spark/scheduler/SchedulerBackend.scala | 3 +
.../org/apache/spark/scheduler/TaskScheduler.scala | 5 +
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 5 +
.../apache/spark/scheduler/TaskSetManager.scala | 6 ++
.../cluster/CoarseGrainedClusterMessage.scala | 2 +
.../cluster/CoarseGrainedSchedulerBackend.scala | 66 +++++++++++-
.../cluster/StandaloneSchedulerBackend.scala | 6 ++
.../scala/org/apache/spark/util/SignalUtils.scala | 2 +-
.../spark/deploy/client/AppClientSuite.scala | 39 ++++++-
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +
.../scheduler/ExternalClusterManagerSuite.scala | 1 +
.../spark/scheduler/WorkerDecommissionSuite.scala | 84 +++++++++++++++
.../apache/spark/deploy/k8s/KubernetesConf.scala | 3 +
.../k8s/features/BasicExecutorFeatureStep.scala | 20 +++-
.../docker/src/main/dockerfiles/spark/Dockerfile | 4 +-
.../docker/src/main/dockerfiles/spark/decom.sh | 35 ++++++
.../src/main/dockerfiles/spark/entrypoint.sh | 6 +-
.../dev/dev-run-integration-tests.sh | 9 +-
.../k8s/integrationtest/DecommissionSuite.scala | 49 +++++++++
.../k8s/integrationtest/KubernetesSuite.scala | 117 ++++++++++++++++-----
.../integration-tests/tests/decommissioning.py | 45 ++++++++
sbin/decommission-slave.sh | 57 ++++++++++
sbin/spark-daemon.sh | 15 +++
36 files changed, 690 insertions(+), 46 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index fba371d..18305ad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -60,6 +60,15 @@ private[deploy] object DeployMessages {
assert (port > 0)
}
+ /**
+ * @param id the worker id
+ * @param worker the worker endpoint ref
+ */
+ case class WorkerDecommission(
+ id: String,
+ worker: RpcEndpointRef)
+ extends DeployMessage
+
case class ExecutorStateChanged(
appId: String,
execId: Int,
@@ -149,6 +158,8 @@ private[deploy] object DeployMessages {
case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
+ case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.
+
// AppClient to Master
case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index 69c98e2..0751bcf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,9 +19,13 @@ package org.apache.spark.deploy
private[deploy] object ExecutorState extends Enumeration {
- val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
+ val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value
type ExecutorState = Value
- def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
+ // DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from
+ // the worker and the executor still exists - but we do want to avoid scheduling new tasks on it.
+ private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED)
+
+ def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index 8f17159..eedf5e9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -180,6 +180,8 @@ private[spark] class StandaloneAppClient(
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
+ } else if (state == ExecutorState.DECOMMISSIONED) {
+ listener.executorDecommissioned(fullId, message.getOrElse(""))
}
case WorkerRemoved(id, host, message) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
index d8bc1a8..2e38a68 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
@@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener {
def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
+ def executorDecommissioned(fullId: String, message: String): Unit
+
def workerRemoved(workerId: String, host: String, message: String): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 8d3795c..71df5df 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -243,6 +243,15 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
+ case WorkerDecommission(id, workerRef) =>
+ logInfo("Recording worker %s decommissioning".format(id))
+ if (state == RecoveryState.STANDBY) {
+ workerRef.send(MasterInStandby)
+ } else {
+ // We use foreach since get gives us an option and we can skip the failures.
+ idToWorker.get(id).foreach(decommissionWorker)
+ }
+
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
masterAddress, resources) =>
@@ -313,7 +322,9 @@ private[deploy] class Master(
// Only retry certain number of times so we don't go into an infinite loop.
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
+ // We also don't count failures from decommissioned workers since they are "expected."
if (!normalExit
+ && oldState != ExecutorState.DECOMMISSIONED
&& appInfo.incrementRetryCount() >= maxExecutorRetries
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
@@ -850,6 +861,26 @@ private[deploy] class Master(
true
}
+ private def decommissionWorker(worker: WorkerInfo): Unit = {
+ if (worker.state != WorkerState.DECOMMISSIONED) {
+ logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port))
+ worker.setState(WorkerState.DECOMMISSIONED)
+ for (exec <- worker.executors.values) {
+ logInfo("Telling app of decommission executors")
+ exec.application.driver.send(ExecutorUpdated(
+ exec.id, ExecutorState.DECOMMISSIONED,
+ Some("worker decommissioned"), None, workerLost = false))
+ exec.state = ExecutorState.DECOMMISSIONED
+ exec.application.removeExecutor(exec)
+ }
+ // On recovery do not add a decommissioned executor
+ persistenceEngine.removeWorker(worker)
+ } else {
+ logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned".
+ format(worker.id, worker.host, worker.port))
+ }
+ }
+
private def removeWorker(worker: WorkerInfo, msg: String): Unit = {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
worker.setState(WorkerState.DEAD)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4be495a..d988bce 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -67,6 +67,14 @@ private[deploy] class Worker(
Utils.checkHost(host)
assert (port > 0)
+ // If worker decommissioning is enabled register a handler on PWR to shutdown.
+ if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+ logInfo("Registering SIGPWR handler to trigger decommissioning.")
+ SignalUtils.register("PWR")(decommissionSelf)
+ } else {
+ logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
+ }
+
// A scheduled executor used to send messages at the specified time.
private val forwardMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
@@ -128,6 +136,7 @@ private[deploy] class Worker(
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
+ private var decommissioned = false
private val workerId = generateWorkerId()
private val sparkHome =
if (sys.props.contains(IS_TESTING.key)) {
@@ -549,6 +558,8 @@ private[deploy] class Worker(
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
+ } else if (decommissioned) {
+ logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
@@ -672,6 +683,9 @@ private[deploy] class Worker(
case ApplicationFinished(id) =>
finishedApps += id
maybeCleanupApplication(id)
+
+ case DecommissionSelf =>
+ decommissionSelf()
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -771,6 +785,18 @@ private[deploy] class Worker(
}
}
+ private[deploy] def decommissionSelf(): Boolean = {
+ if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+ logDebug("Decommissioning self")
+ decommissioned = true
+ sendToMaster(WorkerDecommission(workerId, self))
+ } else {
+ logWarning("Asked to decommission self, but decommissioning not enabled")
+ }
+ // Return true since can be called as a signal handler
+ true
+ }
+
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 25c5b98..faf03a6 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -43,7 +43,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, ThreadUtils, Utils}
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
@@ -64,6 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
+ @volatile private var decommissioned = false
@volatile var driver: Option[RpcEndpointRef] = None
// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
@@ -80,6 +81,9 @@ private[spark] class CoarseGrainedExecutorBackend(
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
override def onStart(): Unit = {
+ logInfo("Registering PWR handler.")
+ SignalUtils.register("PWR")(decommissionSelf)
+
logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
@@ -160,6 +164,16 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
+ if (decommissioned) {
+ logError("Asked to launch a task while decommissioned.")
+ driver match {
+ case Some(endpoint) =>
+ logInfo("Sending DecommissionExecutor to driver.")
+ endpoint.send(DecommissionExecutor(executorId))
+ case _ =>
+ logError("No registered driver to send Decommission to.")
+ }
+ }
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
@@ -242,6 +256,29 @@ private[spark] class CoarseGrainedExecutorBackend(
System.exit(code)
}
+
+ private def decommissionSelf(): Boolean = {
+ logInfo("Decommissioning self w/sync")
+ try {
+ decommissioned = true
+ // Tell master we are are decommissioned so it stops trying to schedule us
+ if (driver.nonEmpty) {
+ driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+ } else {
+ logError("No driver to message decommissioning.")
+ }
+ if (executor != null) {
+ executor.decommission()
+ }
+ logInfo("Done decommissioning self.")
+ // Return true since we are handling a signal
+ true
+ } catch {
+ case e: Exception =>
+ logError(s"Error ${e} during attempt to decommission self")
+ false
+ }
+ }
}
private[spark] object CoarseGrainedExecutorBackend extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 8aeb16f..2bfa1ce 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -216,16 +216,32 @@ private[spark] class Executor(
*/
private var heartbeatFailures = 0
+ /**
+ * Flag to prevent launching new tasks while decommissioned. There could be a race condition
+ * accessing this, but decommissioning is only intended to help not be a hard stop.
+ */
+ private var decommissioned = false
+
heartbeater.start()
metricsPoller.start()
private[executor] def numRunningTasks: Int = runningTasks.size()
+ /**
+ * Mark an executor for decommissioning and avoid launching new tasks.
+ */
+ private[spark] def decommission(): Unit = {
+ decommissioned = true
+ }
+
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
+ if (decommissioned) {
+ log.error(s"Launching a task while in decommissioned state.")
+ }
}
def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index f1eaae2..2b175c1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -71,4 +71,9 @@ private[spark] object Worker {
ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
.intConf
.createWithDefault(100)
+
+ private[spark] val WORKER_DECOMMISSION_ENABLED =
+ ConfigBuilder("spark.worker.decommission.enabled")
+ .booleanConf
+ .createWithDefault(false)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 64d2032..a26b579 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -361,6 +361,7 @@ abstract class RDD[T: ClassTag](
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
+ // Block hit.
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
@@ -374,6 +375,7 @@ abstract class RDD[T: ClassTag](
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
+ // Need to compute the block.
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 46a35b6..ee31093 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -58,3 +58,11 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
private[spark]
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
extends ExecutorLossReason(_message)
+
+/**
+ * A loss reason that means the executor is marked for decommissioning.
+ *
+ * This is used by the task scheduler to remove state associated with the executor, but
+ * not yet fail any tasks that were running in the executor before the executor is "fully" lost.
+ */
+private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 80805df..2e2851e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -88,6 +88,10 @@ private[spark] class Pool(
schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
}
+ override def executorDecommission(executorId: String): Unit = {
+ schedulableQueue.asScala.foreach(_.executorDecommission(executorId))
+ }
+
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue.asScala) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index b6f88ed..8cc239c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -43,6 +43,7 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
+ def executorDecommission(executorId: String): Unit
def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 9159d2a..4752353 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -27,6 +27,9 @@ private[spark] trait SchedulerBackend {
def start(): Unit
def stop(): Unit
+ /**
+ * Update the current offers and schedule tasks
+ */
def reviveOffers(): Unit
def defaultParallelism(): Int
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 15f5d20..e9e638a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -99,6 +99,11 @@ private[spark] trait TaskScheduler {
def applicationId(): String = appId
/**
+ * Process a decommissioning executor.
+ */
+ def executorDecommission(executorId: String): Unit
+
+ /**
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bf92081..1b197c4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -734,6 +734,11 @@ private[spark] class TaskSchedulerImpl(
}
}
+ override def executorDecommission(executorId: String): Unit = {
+ rootPool.executorDecommission(executorId)
+ backend.reviveOffers()
+ }
+
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2ce1134..18684ee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1083,6 +1083,12 @@ private[spark] class TaskSetManager(
levels.toArray
}
+ def executorDecommission(execId: String): Unit = {
+ recomputeLocality()
+ // Future consideration: if an executor is decommissioned it may make sense to add the current
+ // tasks to the spec exec queue.
+ }
+
def recomputeLocality(): Unit = {
// A zombie TaskSetManager may reach here while executorLost happens
if (isZombie) return
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 2833908..8db0122 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -94,6 +94,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
extends CoarseGrainedClusterMessage
+ case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage
+
case class RemoveWorker(workerId: String, host: String, message: String)
extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 63aa049..6e1efda 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -92,6 +92,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason.
private val executorsPendingLossReason = new HashSet[String]
+ // Executors which are being decommissioned
+ protected val executorsPendingDecommission = new HashSet[String]
+
// A map of ResourceProfile id to map of hostname with its possible task number running on it
@GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var rpHostToLocalTaskCount: Map[Int, Map[String, Int]] = Map.empty
@@ -185,11 +188,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
+ case DecommissionExecutor(executorId) =>
+ logError(s"Received decommission executor message ${executorId}.")
+ decommissionExecutor(executorId)
+
+ case RemoveWorker(workerId, host, message) =>
+ removeWorker(workerId, host, message)
+
case LaunchedExecutor(executorId) =>
executorDataMap.get(executorId).foreach { data =>
data.freeCores = data.totalCores
}
makeOffers(executorId)
+ case e =>
+ logError(s"Received unexpected message. ${e}")
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -257,6 +269,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
removeWorker(workerId, host, message)
context.reply(true)
+ case DecommissionExecutor(executorId) =>
+ logError(s"Received decommission executor message ${executorId}.")
+ decommissionExecutor(executorId)
+ context.reply(true)
+
case RetrieveSparkAppConfig(resourceProfileId) =>
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
val reply = SparkAppConfig(
@@ -265,6 +282,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
Option(delegationTokens.get()),
rp)
context.reply(reply)
+ case e =>
+ logError(s"Received unexpected ask ${e}")
}
// Make fake resource offers on all executors
@@ -365,6 +384,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
+ executorsPendingDecommission -= executorId
executorsPendingToRemove.remove(executorId).getOrElse(false)
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
@@ -390,6 +410,35 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
/**
+ * Mark a given executor as decommissioned and stop making resource offers for it.
+ */
+ private def decommissionExecutor(executorId: String): Boolean = {
+ val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
+ // Only bother decommissioning executors which are alive.
+ if (isExecutorActive(executorId)) {
+ executorsPendingDecommission += executorId
+ true
+ } else {
+ false
+ }
+ }
+
+ if (shouldDisable) {
+ logInfo(s"Starting decommissioning executor $executorId.")
+ try {
+ scheduler.executorDecommission(executorId)
+ } catch {
+ case e: Exception =>
+ logError(s"Unexpected error during decommissioning ${e.toString}", e)
+ }
+ logInfo(s"Finished decommissioning executor $executorId.")
+ } else {
+ logInfo(s"Skipping decommissioning of executor $executorId.")
+ }
+ shouldDisable
+ }
+
+ /**
* Stop making resource offers for the given executor. The executor is marked as lost with
* the loss reason still pending.
*
@@ -511,8 +560,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
protected def removeWorker(workerId: String, host: String, message: String): Unit = {
- driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t =>
- logError(t.getMessage, t))(ThreadUtils.sameThread)
+ driverEndpoint.send(RemoveWorker(workerId, host, message))
+ }
+
+ /**
+ * Called by subclasses when notified of a decommissioning executor.
+ */
+ private[spark] def decommissionExecutor(executorId: String): Unit = {
+ if (driverEndpoint != null) {
+ logInfo("Propegating executor decommission to driver.")
+ driverEndpoint.send(DecommissionExecutor(executorId))
+ }
}
def sufficientResourcesRegistered(): Boolean = true
@@ -543,7 +601,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def isExecutorActive(id: String): Boolean = synchronized {
executorDataMap.contains(id) &&
!executorsPendingToRemove.contains(id) &&
- !executorsPendingLossReason.contains(id)
+ !executorsPendingLossReason.contains(id) &&
+ !executorsPendingDecommission.contains(id)
+
}
override def maxNumConcurrentTasks(): Int = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index d91d78b..42c4646 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -174,6 +174,12 @@ private[spark] class StandaloneSchedulerBackend(
removeExecutor(fullId.split("/")(1), reason)
}
+ override def executorDecommissioned(fullId: String, message: String) {
+ logInfo("Asked to decommission executor")
+ decommissionExecutor(fullId.split("/")(1))
+ logInfo("Executor %s decommissioned: %s".format(fullId, message))
+ }
+
override def workerRemoved(workerId: String, host: String, message: String): Unit = {
logInfo("Worker %s removed: %s".format(workerId, message))
removeWorker(workerId, host, message)
diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
index 5a24965..230195d 100644
--- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala
@@ -60,7 +60,7 @@ private[spark] object SignalUtils extends Logging {
if (SystemUtils.IS_OS_UNIX) {
try {
val handler = handlers.getOrElseUpdate(signal, {
- logInfo("Registered signal handler for " + signal)
+ logInfo("Registering signal handler for " + signal)
new ActionHandler(new Signal(signal))
})
handler.register(action)
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index a1d3077..a3e39d7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, Master}
import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.Utils
@@ -44,13 +44,13 @@ class AppClientSuite
with Eventually
with ScalaFutures {
private val numWorkers = 2
- private val conf = new SparkConf()
- private val securityManager = new SecurityManager(conf)
+ private var conf: SparkConf = null
private var masterRpcEnv: RpcEnv = null
private var workerRpcEnvs: Seq[RpcEnv] = null
private var master: Master = null
private var workers: Seq[Worker] = null
+ private var securityManager: SecurityManager = null
/**
* Start the local cluster.
@@ -58,6 +58,8 @@ class AppClientSuite
*/
override def beforeAll(): Unit = {
super.beforeAll()
+ conf = new SparkConf().set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true")
+ securityManager = new SecurityManager(conf)
masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
workerRpcEnvs = (0 until numWorkers).map { i =>
RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
@@ -111,8 +113,23 @@ class AppClientSuite
assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed")
}
+
+ // Save the executor id before decommissioning so we can kill it
+ val application = getApplications().head
+ val executors = application.executors
+ val executorId: String = executors.head._2.fullId
+
+ // Send a decommission self to all the workers
+ // Note: normally the worker would send this on their own.
+ workers.foreach(worker => worker.decommissionSelf())
+
+ // Decommissioning is async.
+ eventually(timeout(1.seconds), interval(10.millis)) {
+ // We only record decommissioning for the executor we've requested
+ assert(ci.listener.execDecommissionedList.size === 1)
+ }
+
// Send request to kill executor, verify request was made
- val executorId: String = getApplications().head.executors.head._2.fullId
whenReady(
ci.client.killExecutors(Seq(executorId)),
timeout(10.seconds),
@@ -120,6 +137,15 @@ class AppClientSuite
assert(acknowledged)
}
+ // Verify that asking for executors on the decommissioned workers fails
+ whenReady(
+ ci.client.requestTotalExecutors(numExecutorsRequested),
+ timeout(10.seconds),
+ interval(10.millis)) { acknowledged =>
+ assert(acknowledged)
+ }
+ assert(getApplications().head.executors.size === 0)
+
// Issue stop command for Client to disconnect from Master
ci.client.stop()
@@ -189,6 +215,7 @@ class AppClientSuite
val deadReasonList = new ConcurrentLinkedQueue[String]()
val execAddedList = new ConcurrentLinkedQueue[String]()
val execRemovedList = new ConcurrentLinkedQueue[String]()
+ val execDecommissionedList = new ConcurrentLinkedQueue[String]()
def connected(id: String): Unit = {
connectedIdList.add(id)
@@ -218,6 +245,10 @@ class AppClientSuite
execRemovedList.add(id)
}
+ def executorDecommissioned(id: String, message: String): Unit = {
+ execDecommissionedList.add(id)
+ }
+
def workerRemoved(workerId: String, host: String, message: String): Unit = {}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 101e60c..e40b63f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -167,6 +167,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
+ override def executorDecommission(executorId: String) = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
@@ -707,6 +708,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorUpdates: Map[(Int, Int), ExecutorMetrics]): Boolean = true
+ override def executorDecommission(executorId: String): Unit = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 4e71ec1..9f593e0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -89,6 +89,7 @@ private class DummyTaskScheduler extends TaskScheduler {
override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
+ override def executorDecommission(executorId: String): Unit = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
new file mode 100644
index 0000000..15733b0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.Semaphore
+
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration._
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils}
+
+class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
+
+ override def beforeEach(): Unit = {
+ val conf = new SparkConf().setAppName("test").setMaster("local")
+ .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
+
+ sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
+ }
+
+ test("verify task with no decommissioning works as expected") {
+ val input = sc.parallelize(1 to 10)
+ input.count()
+ val sleepyRdd = input.mapPartitions{ x =>
+ Thread.sleep(100)
+ x
+ }
+ assert(sleepyRdd.count() === 10)
+ }
+
+ test("verify a task with all workers decommissioned succeeds") {
+ val input = sc.parallelize(1 to 10)
+ // Do a count to wait for the executors to be registered.
+ input.count()
+ val sleepyRdd = input.mapPartitions{ x =>
+ Thread.sleep(50)
+ x
+ }
+ // Listen for the job
+ val sem = new Semaphore(0)
+ sc.addSparkListener(new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ sem.release()
+ }
+ })
+ // Start the task.
+ val asyncCount = sleepyRdd.countAsync()
+ // Wait for the job to have started
+ sem.acquire(1)
+ // Decommission all the executors, this should not halt the current task.
+ // decom.sh message passing is tested manually.
+ val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
+ val execs = sched.getExecutorIds()
+ execs.foreach(execId => sched.decommissionExecutor(execId))
+ val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 2.seconds)
+ assert(asyncCountResult === 10)
+ // Try and launch task after decommissioning, this should fail
+ val postDecommissioned = input.map(x => x)
+ val postDecomAsyncCount = postDecommissioned.countAsync()
+ val thrown = intercept[java.util.concurrent.TimeoutException]{
+ val result = ThreadUtils.awaitResult(postDecomAsyncCount, 2.seconds)
+ }
+ assert(postDecomAsyncCount.isCompleted === false,
+ "After exec decommission new task could not launch")
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 09943b7..f42f341 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -55,6 +55,9 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
}
}
+ def workerDecommissioning: Boolean =
+ sparkConf.get(org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED)
+
def nodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 6a26df2..f575241 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._
import org.apache.spark.rpc.RpcEndpointAddress
@@ -33,7 +34,7 @@ import org.apache.spark.util.Utils
private[spark] class BasicExecutorFeatureStep(
kubernetesConf: KubernetesExecutorConf,
secMgr: SecurityManager)
- extends KubernetesFeatureConfigStep {
+ extends KubernetesFeatureConfigStep with Logging {
// Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
private val executorContainerImage = kubernetesConf
@@ -186,6 +187,21 @@ private[spark] class BasicExecutorFeatureStep(
.endResources()
.build()
}.getOrElse(executorContainer)
+ val containerWithLifecycle =
+ if (!kubernetesConf.workerDecommissioning) {
+ logInfo("Decommissioning not enabled, skipping shutdown script")
+ containerWithLimitCores
+ } else {
+ logInfo("Adding decommission script to lifecycle")
+ new ContainerBuilder(containerWithLimitCores).withNewLifecycle()
+ .withNewPreStop()
+ .withNewExec()
+ .addToCommand("/opt/decom.sh")
+ .endExec()
+ .endPreStop()
+ .endLifecycle()
+ .build()
+ }
val ownerReference = kubernetesConf.driverPod.map { pod =>
new OwnerReferenceBuilder()
.withController(true)
@@ -213,6 +229,6 @@ private[spark] class BasicExecutorFeatureStep(
kubernetesConf.get(KUBERNETES_EXECUTOR_SCHEDULER_NAME)
.foreach(executorPod.getSpec.setSchedulerName)
- SparkPod(executorPod, containerWithLimitCores)
+ SparkPod(executorPod, containerWithLifecycle)
}
}
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
index 6ed37fc..cc65a7d 100644
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
@@ -30,7 +30,7 @@ ARG spark_uid=185
RUN set -ex && \
apt-get update && \
ln -s /lib /lib64 && \
- apt install -y bash tini libc6 libpam-modules krb5-user libnss3 && \
+ apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps && \
mkdir -p /opt/spark && \
mkdir -p /opt/spark/examples && \
mkdir -p /opt/spark/work-dir && \
@@ -45,6 +45,7 @@ COPY jars /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
+COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data
@@ -53,6 +54,7 @@ ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
+RUN chmod a+x /opt/decom.sh
ENTRYPOINT [ "/opt/entrypoint.sh" ]
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
new file mode 100755
index 0000000..8a5208d
--- /dev/null
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+set -ex
+echo "Asked to decommission"
+# Find the pid to signal
+date | tee -a ${LOG}
+WORKER_PID=$(ps -o pid -C java | tail -n 1| awk '{ sub(/^[ \t]+/, ""); print }')
+echo "Using worker pid $WORKER_PID"
+kill -s SIGPWR ${WORKER_PID}
+# For now we expect this to timeout, since we don't start exiting the backend.
+echo "Waiting for worker pid to exit"
+# If the worker does exit stop blocking the cleanup.
+timeout 60 tail --pid=${WORKER_PID} -f /dev/null
+date
+echo "Done"
+date
+sleep 30
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 6ee3523..05ab782 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -30,9 +30,9 @@ set -e
# If there is no passwd entry for the container UID, attempt to create one
if [ -z "$uidentry" ] ; then
if [ -w /etc/passwd ] ; then
- echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd
+ echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd
else
- echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
+ echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
fi
fi
@@ -59,7 +59,7 @@ fi
# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
# It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s.
if [ -n ${HADOOP_HOME} ] && [ -z ${SPARK_DIST_CLASSPATH} ]; then
- export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
+ export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
fi
if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
index 607bb24..292abe9 100755
--- a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
+++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
@@ -16,7 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-set -xo errexit
+set -exo errexit
TEST_ROOT_DIR=$(git rev-parse --show-toplevel)
DEPLOY_MODE="minikube"
@@ -42,6 +42,9 @@ SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version 2>/dev/nu
| grep -v "WARNING"\
| tail -n 1)
+export SCALA_VERSION
+echo $SCALA_VERSION
+
# Parse arguments
while (( "$#" )); do
case $1 in
@@ -110,7 +113,8 @@ while (( "$#" )); do
shift
;;
*)
- break
+ echo "Unexpected command line flag $2 $1."
+ exit 1
;;
esac
shift
@@ -164,6 +168,7 @@ properties+=(
-Dspark.kubernetes.test.jvmImage=$JVM_IMAGE_NAME
-Dspark.kubernetes.test.pythonImage=$PYTHON_IMAGE_NAME
-Dspark.kubernetes.test.rImage=$R_IMAGE_NAME
+ -Dlog4j.logger.org.apache.spark=DEBUG
)
$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pscala-$SCALA_VERSION -Pkubernetes -Pkubernetes-integration-tests ${properties[@]}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
new file mode 100644
index 0000000..f5eab6e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import org.apache.spark.internal.config.Worker
+
+private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
+
+ import DecommissionSuite._
+ import KubernetesSuite.k8sTestTag
+
+ test("Test basic decommissioning", k8sTestTag) {
+ sparkAppConf
+ .set(Worker.WORKER_DECOMMISSION_ENABLED.key, "true")
+ .set("spark.kubernetes.pyspark.pythonVersion", "3")
+ .set("spark.kubernetes.container.image", pyImage)
+
+ runSparkApplicationAndVerifyCompletion(
+ appResource = PYSPARK_DECOMISSIONING,
+ mainClass = "",
+ expectedLogOnCompletion = Seq("decommissioning executor",
+ "Finished waiting, stopping Spark"),
+ appArgs = Array.empty[String],
+ driverPodChecker = doBasicDriverPyPodCheck,
+ executorPodChecker = doBasicExecutorPyPodCheck,
+ appLocator = appLocator,
+ isJVM = false,
+ decommissioningTest = true)
+ }
+}
+
+private[spark] object DecommissionSuite {
+ val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/"
+ val PYSPARK_DECOMISSIONING: String = TEST_LOCAL_PYSPARK + "decommissioning.py"
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 0d4fccc..61e1f27 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -42,7 +42,9 @@ import org.apache.spark.internal.config._
class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite
- with DepsTestsSuite with RTestsSuite with Logging with Eventually with Matchers {
+ with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually
+ with Matchers {
+
import KubernetesSuite._
@@ -254,6 +256,7 @@ class KubernetesSuite extends SparkFunSuite
}
}
+ // scalastyle:off argcount
protected def runSparkApplicationAndVerifyCompletion(
appResource: String,
mainClass: String,
@@ -264,60 +267,120 @@ class KubernetesSuite extends SparkFunSuite
appLocator: String,
isJVM: Boolean,
pyFiles: Option[String] = None,
- executorPatience: Option[(Option[Interval], Option[Timeout])] = None): Unit = {
+ executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
+ decommissioningTest: Boolean = false): Unit = {
+
+ // scalastyle:on argcount
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
- SparkAppLauncher.launch(
- appArguments,
- sparkAppConf,
- TIMEOUT.value.toSeconds.toInt,
- sparkHomeDir,
- isJVM,
- pyFiles)
- val driverPod = kubernetesTestComponents.kubernetesClient
- .pods()
- .withLabel("spark-app-locator", appLocator)
- .withLabel("spark-role", "driver")
- .list()
- .getItems
- .get(0)
- driverPodChecker(driverPod)
val execPods = scala.collection.mutable.Map[String, Pod]()
+ val (patienceInterval, patienceTimeout) = {
+ executorPatience match {
+ case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT))
+ case _ => (INTERVAL, TIMEOUT)
+ }
+ }
+ def checkPodReady(namespace: String, name: String) = {
+ val execPod = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withName(name)
+ .get()
+ val resourceStatus = execPod.getStatus
+ val conditions = resourceStatus.getConditions().asScala
+ val conditionTypes = conditions.map(_.getType())
+ val readyConditions = conditions.filter{cond => cond.getType() == "Ready"}
+ val result = readyConditions
+ .map(cond => cond.getStatus() == "True")
+ .headOption.getOrElse(false)
+ result
+ }
val execWatcher = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
.watch(new Watcher[Pod] {
- logInfo("Beginning watch of executors")
+ logDebug("Beginning watch of executors")
override def onClose(cause: KubernetesClientException): Unit =
logInfo("Ending watch of executors")
override def eventReceived(action: Watcher.Action, resource: Pod): Unit = {
val name = resource.getMetadata.getName
+ val namespace = resource.getMetadata().getNamespace()
action match {
- case Action.ADDED | Action.MODIFIED =>
+ case Action.MODIFIED =>
+ execPods(name) = resource
+ case Action.ADDED =>
+ logDebug(s"Add event received for $name.")
execPods(name) = resource
+ // If testing decommissioning start a thread to simulate
+ // decommissioning.
+ if (decommissioningTest && execPods.size == 1) {
+ // Wait for all the containers in the pod to be running
+ logDebug("Waiting for first pod to become OK prior to deletion")
+ Eventually.eventually(patienceTimeout, patienceInterval) {
+ val result = checkPodReady(namespace, name)
+ result shouldBe (true)
+ }
+ // Sleep a small interval to allow execution of job
+ logDebug("Sleeping before killing pod.")
+ Thread.sleep(2000)
+ // Delete the pod to simulate cluster scale down/migration.
+ val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name)
+ pod.delete()
+ logDebug(s"Triggered pod decom/delete: $name deleted")
+ }
case Action.DELETED | Action.ERROR =>
execPods.remove(name)
}
}
})
- val (patienceInterval, patienceTimeout) = {
- executorPatience match {
- case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT))
- case _ => (INTERVAL, TIMEOUT)
- }
- }
+ logDebug("Starting Spark K8s job")
+ SparkAppLauncher.launch(
+ appArguments,
+ sparkAppConf,
+ TIMEOUT.value.toSeconds.toInt,
+ sparkHomeDir,
+ isJVM,
+ pyFiles)
+ val driverPod = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withLabel("spark-app-locator", appLocator)
+ .withLabel("spark-role", "driver")
+ .list()
+ .getItems
+ .get(0)
+
+ driverPodChecker(driverPod)
+ // If we're testing decommissioning we delete all the executors, but we should have
+ // an executor at some point.
Eventually.eventually(patienceTimeout, patienceInterval) {
execPods.values.nonEmpty should be (true)
}
+ // If decommissioning we need to wait and check the executors were removed
+ if (decommissioningTest) {
+ // Sleep a small interval to ensure everything is registered.
+ Thread.sleep(100)
+ // Wait for the executors to become ready
+ Eventually.eventually(patienceTimeout, patienceInterval) {
+ val anyReadyPods = ! execPods.map{
+ case (name, resource) =>
+ (name, resource.getMetadata().getNamespace())
+ }.filter{
+ case (name, namespace) => checkPodReady(namespace, name)
+ }.isEmpty
+ val podsEmpty = execPods.values.isEmpty
+ val podsReadyOrDead = anyReadyPods || podsEmpty
+ podsReadyOrDead shouldBe (true)
+ }
+ }
execWatcher.close()
execPods.values.foreach(executorPodChecker(_))
- Eventually.eventually(TIMEOUT, patienceInterval) {
+ Eventually.eventually(patienceTimeout, patienceInterval) {
expectedLogOnCompletion.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
.pods()
@@ -425,5 +488,5 @@ private[spark] object KubernetesSuite {
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
- val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
+ val INTERVAL = PatienceConfiguration.Interval(Span(1, Seconds))
}
diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
new file mode 100644
index 0000000..f68f24d
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+import time
+
+from pyspark.sql import SparkSession
+
+
+if __name__ == "__main__":
+ """
+ Usage: decommissioning
+ """
+ print("Starting decom test")
+ spark = SparkSession \
+ .builder \
+ .appName("PyMemoryTest") \
+ .getOrCreate()
+ sc = spark._sc
+ rdd = sc.parallelize(range(10))
+ rdd.collect()
+ print("Waiting to give nodes time to finish.")
+ time.sleep(5)
+ rdd.collect()
+ print("Waiting some more....")
+ time.sleep(10)
+ rdd.collect()
+ print("Finished waiting, stopping Spark.")
+ spark.stop()
+ print("Done, exiting Python")
+ sys.exit(0)
diff --git a/sbin/decommission-slave.sh b/sbin/decommission-slave.sh
new file mode 100644
index 0000000..4bbf257
--- /dev/null
+++ b/sbin/decommission-slave.sh
@@ -0,0 +1,57 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A shell script to decommission all workers on a single slave
+#
+# Environment variables
+#
+# SPARK_WORKER_INSTANCES The number of worker instances that should be
+# running on this slave. Default is 1.
+
+# Usage: decommission-slave.sh [--block-until-exit]
+# Decommissions all slaves on this worker machine
+
+set -ex
+
+if [ -z "${SPARK_HOME}" ]; then
+ export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+fi
+
+. "${SPARK_HOME}/sbin/spark-config.sh"
+
+. "${SPARK_HOME}/bin/load-spark-env.sh"
+
+if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
+ "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker 1
+else
+ for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
+ "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker $(( $i + 1 ))
+ done
+fi
+
+# Check if --block-until-exit is set.
+# This is done for systems which block on the decomissioning script and on exit
+# shut down the entire system (e.g. K8s).
+if [ "$1" == "--block-until-exit" ]; then
+ shift
+ # For now we only block on the 0th instance if there multiple instances.
+ instance=$1
+ pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"
+ wait $pid
+fi
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index 6de67e0..81f2fd4 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -215,6 +215,21 @@ case $option in
fi
;;
+ (decommission)
+
+ if [ -f $pid ]; then
+ TARGET_ID="$(cat "$pid")"
+ if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
+ echo "decommissioning $command"
+ kill -s SIGPWR "$TARGET_ID"
+ else
+ echo "no $command to decommission"
+ fi
+ else
+ echo "no $command to decommission"
+ fi
+ ;;
+
(status)
if [ -f $pid ]; then
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org