You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Thomas Graves (Jira)" <ji...@apache.org> on 2021/10/11 13:27:00 UTC

[jira] [Resolved] (SPARK-36540) AM should not just finish with Success when dissconnected

     [ https://issues.apache.org/jira/browse/SPARK-36540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Thomas Graves resolved SPARK-36540.
-----------------------------------
    Fix Version/s: 3.3.0
         Assignee: angerszhu
       Resolution: Fixed

> AM should not just finish with Success when dissconnected
> ---------------------------------------------------------
>
>                 Key: SPARK-36540
>                 URL: https://issues.apache.org/jira/browse/SPARK-36540
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core, YARN
>    Affects Versions: 3.2.0
>            Reporter: angerszhu
>            Assignee: angerszhu
>            Priority: Major
>             Fix For: 3.3.0
>
>
> We meet a case AM lose connection
> {code}
> 21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=5675952834716124039, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to xx.xx.xx.xx:41420; closing connection
> java.nio.channels.ClosedChannelException
>         at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>         at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>         at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>         at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
>         at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>         at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>         at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>         at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> Check the code about client, when AMEndpoint dissconnected, will finish Application with SUCCESS final status
> {code}
> override def onDisconnected(remoteAddress: RpcAddress): Unit = {
>       // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
>       // This avoids potentially reporting incorrect exit codes if the driver fails
>       if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
>         logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
>         finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
>       }
>     }
> {code}
> Nomally in client mode, when application success, driver will stop and AM loss connection, it's ok that exit with SUCCESS, but if there is a not work problem cause dissconnected. Still finish with final status is not correct.
> Then YarnClientSchedulerBackend will receive application report with final status with success and stop SparkContext cause application failed but mark it as a normal stop.
> {code}
>   private class MonitorThread extends Thread {
>     private var allowInterrupt = true
>     override def run() {
>       try {
>         val YarnAppReport(_, state, diags) =
>           client.monitorApplication(appId.get, logApplicationReport = false)
>         logError(s"YARN application has exited unexpectedly with state $state! " +
>           "Check the YARN application logs for more details.")
>         diags.foreach { err =>
>           logError(s"Diagnostics message: $err")
>         }
>         allowInterrupt = false
>         sc.stop()
>       } catch {
>         case e: InterruptedException => logInfo("Interrupting monitor thread")
>       }
>     }
>     def stopMonitor(): Unit = {
>       if (allowInterrupt) {
>         this.interrupt()
>       }
>     }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org