You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "angerszhu (Jira)" <ji...@apache.org> on 2021/08/18 08:03:00 UTC

[jira] [Updated] (SPARK-36540) AM should not just finish with Success when dissconnected

     [ https://issues.apache.org/jira/browse/SPARK-36540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

angerszhu updated SPARK-36540:
------------------------------
    Description: 
We meet a case AM lose connection
{code}
21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=5675952834716124039, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to ip-10-128-152-193.idata-server.shopee.io/10.128.152.193:41420; closing connection
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
{code}

Check the code about client, when AMEndpoint dissconnected, will finish Application with SUCCESS final status
{code}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
      // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
      // This avoids potentially reporting incorrect exit codes if the driver fails
      if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
        logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
        finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
      }
    }
{code}

Nomally in client mode, when application success, driver will stop and AM loss connection, it's ok that exit with SUCCESS, but if there is a not work problem cause dissconnected. Still finish with final status is not correct.
Then YarnClientSchedulerBackend will receive application report with final status with success and stop SparkContext cause application failed but mark it as a normal stop.
{code}
  private class MonitorThread extends Thread {
    private var allowInterrupt = true

    override def run() {
      try {
        val YarnAppReport(_, state, diags) =
          client.monitorApplication(appId.get, logApplicationReport = false)
        logError(s"YARN application has exited unexpectedly with state $state! " +
          "Check the YARN application logs for more details.")
        diags.foreach { err =>
          logError(s"Diagnostics message: $err")
        }
        allowInterrupt = false
        sc.stop()
      } catch {
        case e: InterruptedException => logInfo("Interrupting monitor thread")
      }
    }

    def stopMonitor(): Unit = {
      if (allowInterrupt) {
        this.interrupt()
      }
    }
  }
{code}


> AM should not just finish with Success when dissconnected
> ---------------------------------------------------------
>
>                 Key: SPARK-36540
>                 URL: https://issues.apache.org/jira/browse/SPARK-36540
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, YARN
>    Affects Versions: 3.2.0
>            Reporter: angerszhu
>            Priority: Major
>
> We meet a case AM lose connection
> {code}
> 21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=5675952834716124039, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to ip-10-128-152-193.idata-server.shopee.io/10.128.152.193:41420; closing connection
> java.nio.channels.ClosedChannelException
>         at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>         at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>         at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>         at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
>         at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>         at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>         at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>         at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> Check the code about client, when AMEndpoint dissconnected, will finish Application with SUCCESS final status
> {code}
> override def onDisconnected(remoteAddress: RpcAddress): Unit = {
>       // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
>       // This avoids potentially reporting incorrect exit codes if the driver fails
>       if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
>         logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
>         finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
>       }
>     }
> {code}
> Nomally in client mode, when application success, driver will stop and AM loss connection, it's ok that exit with SUCCESS, but if there is a not work problem cause dissconnected. Still finish with final status is not correct.
> Then YarnClientSchedulerBackend will receive application report with final status with success and stop SparkContext cause application failed but mark it as a normal stop.
> {code}
>   private class MonitorThread extends Thread {
>     private var allowInterrupt = true
>     override def run() {
>       try {
>         val YarnAppReport(_, state, diags) =
>           client.monitorApplication(appId.get, logApplicationReport = false)
>         logError(s"YARN application has exited unexpectedly with state $state! " +
>           "Check the YARN application logs for more details.")
>         diags.foreach { err =>
>           logError(s"Diagnostics message: $err")
>         }
>         allowInterrupt = false
>         sc.stop()
>       } catch {
>         case e: InterruptedException => logInfo("Interrupting monitor thread")
>       }
>     }
>     def stopMonitor(): Unit = {
>       if (allowInterrupt) {
>         this.interrupt()
>       }
>     }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org