You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/31 21:13:42 UTC
[spark] branch master updated: [SPARK-31946][CORE] Make
worker/executor decommission signal configurable
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3fe5614 [SPARK-31946][CORE] Make worker/executor decommission signal configurable
3fe5614 is described below
commit 3fe5614a7cc8f5b65a90924e9a4a535fcaf76a98
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Dec 31 13:13:02 2020 -0800
[SPARK-31946][CORE] Make worker/executor decommission signal configurable
### What changes were proposed in this pull request?
This PR proposed to make worker/executor decommission signal configurable.
* Added confs: `spark.worker.decommission.signal` / `spark.executor.decommission.signal`
* Rename `WorkerSigPWRReceived`/ `ExecutorSigPWRReceived` to `WorkerDecomSigReceived`/ `ExecutorDecomSigReceived`
### Why are the changes needed?
The current signal `PWR` can't work on macOS since it's not compliant with POSIX while macOS does. So the developers currently can't do end-to-end decommission test on their macOS environment.
Besides, the configuration becomes more flexible for users in case the default signal (`PWR`) gets conflicted with their own applications/environment.
### Does this PR introduce _any_ user-facing change?
No (it's a new API for 3.2)
### How was this patch tested?
Manually tested.
Closes #30968 from Ngone51/configurable-decom-signal.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../main/scala/org/apache/spark/deploy/DeployMessage.scala | 4 ++--
.../main/scala/org/apache/spark/deploy/worker/Worker.scala | 13 +++++++------
.../spark/executor/CoarseGrainedExecutorBackend.scala | 9 +++++----
.../scala/org/apache/spark/internal/config/Worker.scala | 7 +++++++
.../scala/org/apache/spark/internal/config/package.scala | 7 +++++++
.../scheduler/cluster/CoarseGrainedClusterMessage.scala | 4 ++--
6 files changed, 30 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index d5b5375..727cdbc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -77,10 +77,10 @@ private[deploy] object DeployMessages {
object DecommissionWorker extends DeployMessage
/**
- * A message that sent by the Worker to itself when it receives PWR signal,
+ * A message that sent by the Worker to itself when it receives a signal,
* indicating the Worker starts to decommission.
*/
- object WorkerSigPWRReceived extends DeployMessage
+ object WorkerDecommissionSigReceived extends DeployMessage
/**
* A message sent from Worker to Master to tell Master that the Worker has started
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a6092f6..a3c7375 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -66,16 +66,17 @@ private[deploy] class Worker(
Utils.checkHost(host)
assert (port > 0)
- // If worker decommissioning is enabled register a handler on PWR to shutdown.
+ // If worker decommissioning is enabled register a handler on the configured signal to shutdown.
if (conf.get(config.DECOMMISSION_ENABLED)) {
- logInfo("Registering SIGPWR handler to trigger decommissioning.")
- SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
+ val signal = conf.get(config.Worker.WORKER_DECOMMISSION_SIGNAL)
+ logInfo(s"Registering SIG$signal handler to trigger decommissioning.")
+ SignalUtils.register(signal, s"Failed to register SIG$signal handler - " +
"disabling worker decommission feature.") {
- self.send(WorkerSigPWRReceived)
+ self.send(WorkerDecommissionSigReceived)
true
}
} else {
- logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
+ logInfo("Worker decommissioning not enabled.")
}
// A scheduled executor used to send messages at the specified time.
@@ -682,7 +683,7 @@ private[deploy] class Worker(
case DecommissionWorker =>
decommissionSelf()
- case WorkerSigPWRReceived =>
+ case WorkerDecommissionSigReceived =>
decommissionSelf()
// Tell the Master that we are starting decommissioning
// so it stops trying to launch executor/driver on us
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 6a1fd57..e1d3009 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -82,9 +82,10 @@ private[spark] class CoarseGrainedExecutorBackend(
override def onStart(): Unit = {
if (env.conf.get(DECOMMISSION_ENABLED)) {
- logInfo("Registering PWR handler to trigger decommissioning.")
- SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
- "disabling executor decommission feature.") (self.askSync[Boolean](ExecutorSigPWRReceived))
+ val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL)
+ logInfo(s"Registering SIG$signal handler to trigger decommissioning.")
+ SignalUtils.register(signal, s"Failed to register SIG$signal handler - disabling" +
+ s" executor decommission feature.") (self.askSync[Boolean](ExecutorDecommissionSigReceived))
}
logInfo("Connecting to driver: " + driverUrl)
@@ -208,7 +209,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case ExecutorSigPWRReceived =>
+ case ExecutorDecommissionSigReceived =>
var driverNotified = false
try {
driver.foreach { driverRef =>
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index a807271..fda3a57 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -82,4 +82,11 @@ private[spark] object Worker {
.version("2.0.2")
.intConf
.createWithDefault(100)
+
+ val WORKER_DECOMMISSION_SIGNAL =
+ ConfigBuilder("spark.worker.decommission.signal")
+ .doc("The signal that used to trigger the worker to start decommission.")
+ .version("3.2.0")
+ .stringConf
+ .createWithDefaultString("PWR")
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index cbf4a97..adaf92d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1927,6 +1927,13 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createOptional
+ private[spark] val EXECUTOR_DECOMMISSION_SIGNAL =
+ ConfigBuilder("spark.executor.decommission.signal")
+ .doc("The signal that used to trigger the executor to start decommission.")
+ .version("3.2.0")
+ .stringConf
+ .createWithDefaultString("PWR")
+
private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.version("2.0.0")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index e084453..2f17143 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -102,9 +102,9 @@ private[spark] object CoarseGrainedClusterMessages {
// It's used for Standalone's cases, where decommission is triggered at MasterWebUI or Worker.
object DecommissionExecutor extends CoarseGrainedClusterMessage
- // A message that sent to the executor itself when it receives PWR signal,
+ // A message that sent to the executor itself when it receives a signal,
// indicating the executor starts to decommission.
- object ExecutorSigPWRReceived extends CoarseGrainedClusterMessage
+ object ExecutorDecommissionSigReceived extends CoarseGrainedClusterMessage
case class RemoveWorker(workerId: String, host: String, message: String)
extends CoarseGrainedClusterMessage
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org