You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "XiDuo You (Jira)" <ji...@apache.org> on 2022/05/25 13:23:00 UTC

[jira] [Updated] (SPARK-39291) Fetch blocks and open stream should not respond a closed channel

     [ https://issues.apache.org/jira/browse/SPARK-39291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

XiDuo You updated SPARK-39291:
------------------------------
    Description: 
If user cancel and interrupt a reduce task who is fetching shuffle blocks, the channel would be closed. However there may be some ChunkFetchRequest still in flight, so the server side TransportRequestHandler would still try to respond those ChunkFetchRequest. It gets worser if the reduce stage is big.

 
{code:java}
22/05/24 21:29:30 ERROR ChunkFetchRequestHandler: Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=736493140719,chunkIndex=6],errorString=java.lang.IllegalStateException: Requested chunk not available since streamId 736493140719 is closed
    at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
    at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:103)
    at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82)
    at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
    at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:750)
] to /ip:port; closing connection
java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:750) {code}
 

 

  was:
If user cancel and interrupt a reduce task who is fetching shuffle blocks, the channel would be closed. However there may be some ChunkFetchRequest still in flight, so the server side TransportRequestHandler would still try to respond those ChunkFetchRequest. It gets worser if the reduce stage is big.

 

 
{code:java}
22/05/24 21:29:30 ERROR ChunkFetchRequestHandler: Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=736493140719,chunkIndex=6],errorString=java.lang.IllegalStateException: Requested chunk not available since streamId 736493140719 is closed
    at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
    at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:103)
    at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82)
    at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
    at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:750)
] to /ip:port; closing connection
java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:750) {code}
 

 


> Fetch blocks and open stream should not respond a closed channel
> ----------------------------------------------------------------
>
>                 Key: SPARK-39291
>                 URL: https://issues.apache.org/jira/browse/SPARK-39291
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.4.0
>            Reporter: XiDuo You
>            Priority: Major
>
> If user cancel and interrupt a reduce task who is fetching shuffle blocks, the channel would be closed. However there may be some ChunkFetchRequest still in flight, so the server side TransportRequestHandler would still try to respond those ChunkFetchRequest. It gets worser if the reduce stage is big.
>  
> {code:java}
> 22/05/24 21:29:30 ERROR ChunkFetchRequestHandler: Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=736493140719,chunkIndex=6],errorString=java.lang.IllegalStateException: Requested chunk not available since streamId 736493140719 is closed
>     at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
>     at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:103)
>     at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82)
>     at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
>     at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
>     at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
>     at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>     at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>     at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>     at java.lang.Thread.run(Thread.java:750)
> ] to /ip:port; closing connection
> java.nio.channels.ClosedChannelException
>     at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>     at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>     at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>     at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>     at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>     at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>     at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>     at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>     at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>     at java.lang.Thread.run(Thread.java:750) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org