You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2021/07/01 03:41:14 UTC
[spark] branch branch-3.1 updated: [SPARK-35714][FOLLOW-UP][CORE]
Use a shared stopping flag for WorkerWatcher to avoid the duplicate
System.exit
This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9229eb9 [SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for WorkerWatcher to avoid the duplicate System.exit
9229eb9 is described below
commit 9229eb9a94cfcfd8c030b7dff4ab81227fffc742
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Jul 1 11:40:00 2021 +0800
[SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for WorkerWatcher to avoid the duplicate System.exit
### What changes were proposed in this pull request?
This PR proposes to let `WorkerWatcher` reuse the `stopping` flag in `CoarseGrainedExecutorBackend` to avoid the duplicate call of `System.exit`.
### Why are the changes needed?
As a followup of https://github.com/apache/spark/pull/32868, this PR tries to give a more robust fix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass existing tests.
Closes #33028 from Ngone51/spark-35714-followup.
Lead-authored-by: yi.wu <yi...@databricks.com>
Co-authored-by: wuyi <yi...@databricks.com>
Signed-off-by: yi.wu <yi...@databricks.com>
(cherry picked from commit 868a59470650cc12272de0d0b04c6d98b1fe076d)
Signed-off-by: yi.wu <yi...@databricks.com>
---
.../apache/spark/deploy/worker/WorkerWatcher.scala | 17 ++++++-----
.../executor/CoarseGrainedExecutorBackend.scala | 33 +++++++++++++---------
2 files changed, 27 insertions(+), 23 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 43ec492..efffc9f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -17,13 +17,10 @@
package org.apache.spark.deploy.worker
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.concurrent.duration._
+import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
-import org.apache.spark.util.ThreadUtils
/**
* Endpoint which connects to a worker process and terminates the JVM if the
@@ -31,7 +28,10 @@ import org.apache.spark.util.ThreadUtils
* Provides fate sharing between a worker and its associated child processes.
*/
private[spark] class WorkerWatcher(
- override val rpcEnv: RpcEnv, workerUrl: String, isTesting: Boolean = false)
+ override val rpcEnv: RpcEnv,
+ workerUrl: String,
+ isTesting: Boolean = false,
+ isChildProcessStopping: AtomicBoolean = new AtomicBoolean(false))
extends RpcEndpoint with Logging {
logInfo(s"Connecting to worker $workerUrl")
@@ -53,10 +53,9 @@ private[spark] class WorkerWatcher(
private def exitNonZero() =
if (isTesting) {
isShutDown = true
- } else {
- ThreadUtils.awaitResult(Future {
- System.exit(-1)
- }, 5.seconds)
+ } else if (isChildProcessStopping.compareAndSet(false, true)) {
+ // SPARK-35714: avoid the duplicate call of `System.exit` to avoid the dead lock
+ System.exit(-1)
}
override def receive: PartialFunction[Any, Unit] = {
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index d607ee8..95237c9 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -60,7 +60,7 @@ private[spark] class CoarseGrainedExecutorBackend(
private implicit val formats = DefaultFormats
- private[this] val stopping = new AtomicBoolean(false)
+ private[executor] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None
@@ -261,18 +261,22 @@ private[spark] class CoarseGrainedExecutorBackend(
reason: String,
throwable: Throwable = null,
notifyDriver: Boolean = true) = {
- val message = "Executor self-exiting due to : " + reason
- if (throwable != null) {
- logError(message, throwable)
- } else {
- logError(message)
- }
+ if (stopping.compareAndSet(false, true)) {
+ val message = "Executor self-exiting due to : " + reason
+ if (throwable != null) {
+ logError(message, throwable)
+ } else {
+ logError(message)
+ }
- if (notifyDriver && driver.nonEmpty) {
- driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
- }
+ if (notifyDriver && driver.nonEmpty) {
+ driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
+ }
- System.exit(code)
+ System.exit(code)
+ } else {
+ logInfo("Skip exiting executor since it's been already asked to exit before.")
+ }
}
private def decommissionSelf(): Unit = {
@@ -441,10 +445,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
- env.rpcEnv.setupEndpoint("Executor",
- backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
+ val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
+ env.rpcEnv.setupEndpoint("Executor", backend)
arguments.workerUrl.foreach { url =>
- env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
+ env.rpcEnv.setupEndpoint("WorkerWatcher",
+ new WorkerWatcher(env.rpcEnv, url, isChildProcessStopping = backend.stopping))
}
env.rpcEnv.awaitTermination()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org