You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2021/07/01 03:41:14 UTC

[spark] branch branch-3.1 updated: [SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for WorkerWatcher to avoid the duplicate System.exit

This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9229eb9  [SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for WorkerWatcher to avoid the duplicate System.exit
9229eb9 is described below

commit 9229eb9a94cfcfd8c030b7dff4ab81227fffc742
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Jul 1 11:40:00 2021 +0800

    [SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for WorkerWatcher to avoid the duplicate System.exit
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to let `WorkerWatcher` reuse the `stopping` flag in `CoarseGrainedExecutorBackend` to avoid the duplicate call of `System.exit`.
    
    ### Why are the changes needed?
    
    As a followup of https://github.com/apache/spark/pull/32868, this PR tries to give a more robust fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass existing tests.
    
    Closes #33028 from Ngone51/spark-35714-followup.
    
    Lead-authored-by: yi.wu <yi...@databricks.com>
    Co-authored-by: wuyi <yi...@databricks.com>
    Signed-off-by: yi.wu <yi...@databricks.com>
    (cherry picked from commit 868a59470650cc12272de0d0b04c6d98b1fe076d)
    Signed-off-by: yi.wu <yi...@databricks.com>
---
 .../apache/spark/deploy/worker/WorkerWatcher.scala | 17 ++++++-----
 .../executor/CoarseGrainedExecutorBackend.scala    | 33 +++++++++++++---------
 2 files changed, 27 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 43ec492..efffc9f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -17,13 +17,10 @@
 
 package org.apache.spark.deploy.worker
 
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.concurrent.duration._
+import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
-import org.apache.spark.util.ThreadUtils
 
 /**
  * Endpoint which connects to a worker process and terminates the JVM if the
@@ -31,7 +28,10 @@ import org.apache.spark.util.ThreadUtils
  * Provides fate sharing between a worker and its associated child processes.
  */
 private[spark] class WorkerWatcher(
-    override val rpcEnv: RpcEnv, workerUrl: String, isTesting: Boolean = false)
+    override val rpcEnv: RpcEnv,
+    workerUrl: String,
+    isTesting: Boolean = false,
+    isChildProcessStopping: AtomicBoolean = new AtomicBoolean(false))
   extends RpcEndpoint with Logging {
 
   logInfo(s"Connecting to worker $workerUrl")
@@ -53,10 +53,9 @@ private[spark] class WorkerWatcher(
   private def exitNonZero() =
     if (isTesting) {
       isShutDown = true
-    } else {
-      ThreadUtils.awaitResult(Future {
-        System.exit(-1)
-      }, 5.seconds)
+    } else if (isChildProcessStopping.compareAndSet(false, true)) {
+      // SPARK-35714: avoid the duplicate call of `System.exit` to avoid the dead lock
+      System.exit(-1)
     }
 
   override def receive: PartialFunction[Any, Unit] = {
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index d607ee8..95237c9 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -60,7 +60,7 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   private implicit val formats = DefaultFormats
 
-  private[this] val stopping = new AtomicBoolean(false)
+  private[executor] val stopping = new AtomicBoolean(false)
   var executor: Executor = null
   @volatile var driver: Option[RpcEndpointRef] = None
 
@@ -261,18 +261,22 @@ private[spark] class CoarseGrainedExecutorBackend(
                              reason: String,
                              throwable: Throwable = null,
                              notifyDriver: Boolean = true) = {
-    val message = "Executor self-exiting due to : " + reason
-    if (throwable != null) {
-      logError(message, throwable)
-    } else {
-      logError(message)
-    }
+    if (stopping.compareAndSet(false, true)) {
+      val message = "Executor self-exiting due to : " + reason
+      if (throwable != null) {
+        logError(message, throwable)
+      } else {
+        logError(message)
+      }
 
-    if (notifyDriver && driver.nonEmpty) {
-      driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
-    }
+      if (notifyDriver && driver.nonEmpty) {
+        driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
+      }
 
-    System.exit(code)
+      System.exit(code)
+    } else {
+      logInfo("Skip exiting executor since it's been already asked to exit before.")
+    }
   }
 
   private def decommissionSelf(): Unit = {
@@ -441,10 +445,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
         arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
 
-      env.rpcEnv.setupEndpoint("Executor",
-        backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
+      val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
+      env.rpcEnv.setupEndpoint("Executor", backend)
       arguments.workerUrl.foreach { url =>
-        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
+        env.rpcEnv.setupEndpoint("WorkerWatcher",
+          new WorkerWatcher(env.rpcEnv, url, isChildProcessStopping = backend.stopping))
       }
       env.rpcEnv.awaitTermination()
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org