You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2022/01/11 13:30:00 UTC

[jira] (FLINK-25441) ProducerFailedException will cause task status switch from RUNNING to CANCELED, which will cause the job to hang.

    [ https://issues.apache.org/jira/browse/FLINK-25441 ]


    Piotr Nowojski deleted comment on FLINK-25441:
    ----------------------------------------

was (Author: pnowojski):
[~kevin.cyj] what do you think about this question of mine?
{quote}
is the correct thing to do. A better question would be why this error was not propagated up on the upstream task? It should be the upstream task's switch from `RUNNING` to `FAILED` trigger the job failover by JM. In other words, I think the bug here is that the "java.util.concurrent.TimeoutException: Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase 'taskmanager.memory.framework.off-heap.batch-shuffle.size'." was not propagated on the upstream task. WDYT Yingjie Cao?
{quote}
Is it because upstream task has already finished successfully and this problem is only about reading the already produced result?

> ProducerFailedException will cause task status switch from RUNNING to CANCELED, which will cause the job to hang.
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25441
>                 URL: https://issues.apache.org/jira/browse/FLINK-25441
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.15.0
>            Reporter: Lijie Wang
>            Priority: Major
>
> The {{ProducerFailedException}} extends {{{}CancelTaskException{}}}, which will cause the task status switched from RUNNING to CANCELED. As described in FLINK-17726, if a task is directly CANCELED by TaskManager due to its own runtime issue, the task will not be recovered by JM and thus the job would hang.
> Note that it will not cause problems before FLINK-24182 (it unifies the failureCause handling, changes the check of CancelTaskException from "{{instanceof CancelTaskException}}" to "{{ExceptionUtils.findThrowable}}"), because the {{ProducerFailedException}} is always wrapped by {{{}RemoteTransportException{}}}.
> The example log is as follows:
> {code:java}
> 2021-12-23 21:20:14,965 DEBUG org.apache.flink.runtime.taskmanager.Task                    [] - MultipleInput[945] [Source: HiveSource-tpcds_bin_orc_10000.catalog_sales, Source: HiveSource-tpcds_bin_orc_10000.store_sales, Source: HiveSource-tpcds_bin_orc_10000.catalog_sales, Source: HiveSource-tpcds_bin_orc_10000.store_sales, Source: HiveSource-tpcds_bin_orc_10000.store_sales, Source: HiveSource-tpcds_bin_orc_10000.item, Source: HiveSource-tpcds_bin_orc_10000.web_sales, Source: HiveSource-tpcds_bin_orc_10000.web_sales] -&gt; Calc[885] (143/1024)#0 (8a883116ab601dd5b9ad5d2717d18918) switched from RUNNING to CANCELED due to CancelTaskException: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'k28b09250.eu95sqa.tbsite.net/100.69.96.154:47459'.
>   at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:301)
>   at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:190)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>   at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>   at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>   at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>   at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
>   at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
>   at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>   at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>   at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>   at java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException: java.util.concurrent.TimeoutException: Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase 'taskmanager.memory.framework.off-heap.batch-shuffle.size'.
>   at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:285)
>   at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:123)
>   at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:234)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
>   at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117)
>   at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
>   at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1428)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
>   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
>   at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:913)
>   at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:91)
>   at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>   at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>   at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>   at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>   at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>   at java.lang.Thread.run(Thread.java:756)
> Caused by: java.util.concurrent.TimeoutException: Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase 'taskmanager.memory.framework.off-heap.batch-shuffle.size'.
>   at org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler.allocateBuffers(SortMergeResultPartitionReadScheduler.java:168)
>   at org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler.run(SortMergeResultPartitionReadScheduler.java:139)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)