You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2021/10/11 13:25:32 UTC

[spark] branch master updated: [SPARK-36540][YARN] YARN-CLIENT mode should check Shutdown message when AMEndpoint disconencted

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 20051eb  [SPARK-36540][YARN] YARN-CLIENT mode should check Shutdown message when AMEndpoint disconencted
20051eb is described below

commit 20051eb69904de6afc27fe5adb18bcc760c78701
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Oct 11 08:24:49 2021 -0500

    [SPARK-36540][YARN] YARN-CLIENT mode should check Shutdown message when AMEndpoint disconencted
    
    ### What changes were proposed in this pull request?
    We meet a case AM lose connection
    ```
    21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=5675952834716124039, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to xx.xx.xx.xx:41420; closing connection
    java.nio.channels.ClosedChannelException
            at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
            at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
            at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
            at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
            at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
            at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
            at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
            at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
            at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
            at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    Check the code about client, when AMEndpoint disconnected, will finish Application with SUCCESS final status
    ```
    override def onDisconnected(remoteAddress: RpcAddress): Unit = {
          // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
          // This avoids potentially reporting incorrect exit codes if the driver fails
          if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
            logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
            finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
          }
        }
    ```
    Normally say in client mode, when application success, driver will stop and AM loss connection, it's ok that exit with SUCCESS, but if there is a not work problem cause disconnected. Still finish with final status is not correct.
    Then YarnClientSchedulerBackend will receive application report with final status with success and stop SparkContext cause application failed but mark it as a normal stop.
    ```
      private class MonitorThread extends Thread {
        private var allowInterrupt = true
    
        override def run() {
          try {
            val YarnAppReport(_, state, diags) =
              client.monitorApplication(appId.get, logApplicationReport = false)
            logError(s"YARN application has exited unexpectedly with state $state! " +
              "Check the YARN application logs for more details.")
            diags.foreach { err =>
              logError(s"Diagnostics message: $err")
            }
            allowInterrupt = false
            sc.stop()
          } catch {
            case e: InterruptedException => logInfo("Interrupting monitor thread")
          }
        }
    
        def stopMonitor(): Unit = {
          if (allowInterrupt) {
            this.interrupt()
          }
    ```
    IMO, we should send a `Shutdown` message to yarn client mode AM to make sure the shut down case
    
    ### Why are the changes needed?
    Fix bug
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    Closes #33780 from AngersZhuuuu/SPARK-36540.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 docs/running-on-yarn.md                                   | 13 +++++++++++++
 .../org/apache/spark/deploy/yarn/ApplicationMaster.scala  | 15 +++++++++++++--
 .../main/scala/org/apache/spark/deploy/yarn/config.scala  | 15 +++++++++++++++
 .../scheduler/cluster/YarnClientSchedulerBackend.scala    |  1 +
 .../spark/scheduler/cluster/YarnSchedulerBackend.scala    | 13 +++++++++++--
 5 files changed, 53 insertions(+), 4 deletions(-)

diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 37ff479..8b7ed18 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -442,6 +442,19 @@ To use a custom metrics.properties for the application master and executors, upd
   <td>1.6.0</td>
 </tr>
 <tr>
+  <td><code>spark.yarn.am.clientModeTreatDisconnectAsFailed</code></td>
+  <td>false</td>
+  <td>
+  Treat yarn-client unclean disconnects as failures. In yarn-client mode, normally the application will always finish
+  with a final status of SUCCESS because in some cases, it is not possible to know if the Application was terminated
+  intentionally by the user or if there was a real error. This config changes that behavior such that if the Application
+  Master disconnects from the driver uncleanly (ie without the proper shutdown handshake) the application will
+  terminate with a final status of FAILED. This will allow the caller to decide if it was truly a failure. Note that if
+  this config is set and the user just terminate the client application badly it may show a status of FAILED when it wasn't really FAILED.
+  </td>
+  <td>3.3.0</td>
+</tr>
+<tr>
   <td><code>spark.yarn.am.clientModeExitOnError</code></td>
   <td>false</td>
   <td>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1c62d0a..a25edb0 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -784,6 +784,9 @@ private[spark] class ApplicationMaster(
    */
   private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef)
     extends RpcEndpoint with Logging {
+    @volatile private var shutdown = false
+    private val clientModeTreatDisconnectAsFailed =
+      sparkConf.get(AM_CLIENT_MODE_TREAT_DISCONNECT_AS_FAILED)
 
     override def onStart(): Unit = {
       driver.send(RegisterClusterManager(self))
@@ -801,6 +804,8 @@ private[spark] class ApplicationMaster(
     override def receive: PartialFunction[Any, Unit] = {
       case UpdateDelegationTokens(tokens) =>
         SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
+
+      case Shutdown => shutdown = true
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -843,8 +848,13 @@ private[spark] class ApplicationMaster(
       // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
       // This avoids potentially reporting incorrect exit codes if the driver fails
       if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
-        logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
-        finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+        if (shutdown || !clientModeTreatDisconnectAsFailed) {
+          logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
+          finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+        } else {
+          logError(s"Application Master lost connection with driver! Shutting down. $remoteAddress")
+          finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_DISCONNECTED)
+        }
       }
     }
   }
@@ -862,6 +872,7 @@ object ApplicationMaster extends Logging {
   private val EXIT_SECURITY = 14
   private val EXIT_EXCEPTION_USER_CLASS = 15
   private val EXIT_EARLY = 16
+  private val EXIT_DISCONNECTED = 17
 
   private var master: ApplicationMaster = _
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index ab2063c..1270f1e 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -52,6 +52,21 @@ package object config extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val AM_CLIENT_MODE_TREAT_DISCONNECT_AS_FAILED =
+    ConfigBuilder("spark.yarn.am.clientModeTreatDisconnectAsFailed")
+      .doc("Treat yarn-client unclean disconnects as failures. In yarn-client mode, normally the " +
+        "application will always finish with a final status of SUCCESS because in some cases, " +
+        "it is not possible to know if the Application was terminated intentionally by the user " +
+        "or if there was a real error. This config changes that behavior such that " +
+        "if the Application Master disconnects from the driver uncleanly (ie without the proper" +
+        " shutdown handshake) the application will terminate with a final status of FAILED. " +
+        "This will allow the caller to decide if it was truly a failure. Note that " +
+        "if this config is set and the user just terminate the client application badly " +
+        "it may show a status of FAILED when it wasn't really FAILED.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val AM_CLIENT_MODE_EXIT_ON_ERROR =
     ConfigBuilder("spark.yarn.am.clientModeExitOnError")
       .doc("In yarn-client mode, when this is true, if driver got " +
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 28c8652..1b70e40 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -161,6 +161,7 @@ private[spark] class YarnClientSchedulerBackend(
    */
   override def stop(): Unit = {
     assert(client != null, "Attempted to stop this scheduler before starting it!")
+    yarnSchedulerEndpoint.handleClientModeDriverStop()
     if (monitorThread != null) {
       monitorThread.stopMonitor()
     }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index c5c4594..c3aea37 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -58,7 +58,7 @@ private[spark] abstract class YarnSchedulerBackend(
 
   protected var totalExpectedExecutors = 0
 
-  private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+  protected val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
   protected var amEndpoint: Option[RpcEndpointRef] = None
 
   private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
@@ -291,7 +291,7 @@ private[spark] abstract class YarnSchedulerBackend(
   /**
    * An [[RpcEndpoint]] that communicates with the ApplicationMaster.
    */
-  private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
+  protected class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
     extends ThreadSafeRpcEndpoint with Logging {
 
     private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
@@ -319,6 +319,15 @@ private[spark] abstract class YarnSchedulerBackend(
       removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
     }
 
+    private[cluster] def handleClientModeDriverStop(): Unit = {
+      amEndpoint match {
+        case Some(am) =>
+          am.send(Shutdown)
+        case None =>
+          logWarning("Attempted to send shutdown message before the AM has registered!")
+      }
+    }
+
     override def receive: PartialFunction[Any, Unit] = {
       case RegisterClusterManager(am) =>
         logInfo(s"ApplicationMaster registered as $am")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org