You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by weijie guo <gu...@gmail.com> on 2022/11/01 06:17:15 UTC

Re: 关于LocalTransportException的优化方向咨询

1.all to all的边的话,你这个例子把并发降下来肯定连接数要少很多的。 slot
sharing的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
2.指的是作业jar,每个TM只会下载一次

Best regards,

Weijie


yidan zhao <hi...@gmail.com> 于2022年10月31日周一 19:54写道:

> 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> failed;这种异常的概率。
> 问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
> 任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。
>
> weijie guo <gu...@gmail.com> 于2022年10月31日周一 12:54写道:
> >
> > 你好,请问使用的flink版本是多少?
> > 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> >
> 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> > restore慢等
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > yidan zhao <hi...@gmail.com> 于2022年10月30日周日 11:36写道:
> >
> > > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > > .network.netty.exception.LocalTransportException
> > > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > > (1)Sending the partition request to '...' failed;
> > > org.apache.flink.runtime.io
> > > .network.netty.exception.LocalTransportException:
> > > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > > at org.apache.flink.runtime.io
> > >
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > at org.apache.flink.runtime.io
> > >
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by:
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > > ChannelPromise)(Unknown Source)
> > > Caused by:
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > writeAddress(..) failed: Connection timed out
> > >
> > > (2)readAddress(..) failed: Connection timed out
> > > org.apache.flink.runtime.io
> > > .network.netty.exception.LocalTransportException:
> > > readAddress(..) failed: Connection timed out (connection to
> > > '10.35.109.149/10.35.109.149:2094')
> > > at org.apache.flink.runtime.io
> > >
> .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:168)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > at
> > >
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by:
> > >
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > readAddress(..) failed: Connection timed out
> > >
> > > 集群信息:standalone 集群,上百台机器。每台都是自研容器,10g内存左右,每个TM提供1个slot,避免任务之间影响。
> > >
> > >
> 作业信息:作业数小于10个。复杂的单任务大概用120并发执行,流量入口16w/s左右,数据处理、5min平滑window处理(目前偶尔buzz在window处)、redis
> > > 异步算子等。
> > >
> > >
> 作业状态:反压目前倒是不严重,基本很少反压了。但经常因为出现如上的连接相关问题,自研容器虽然存在故障重启等可能性,但经过调整当前容器本身出问题概率多天才1次,可是任务可能一天好多次报LocalTransportException异常。
> > >
> > >
> 此外,按照之前的经验,LocalTransportException的出现概率和我的集群运行时间也有关,长期运行大概几天后就会加大概率,然后整体重启后会好转。
> > >
> > >
> > >
> 当前在考虑优化可能方向,大佬们看看有啥建议,实际因为做实验对比周期太长,最好有一定倾向,否则控制影响因素一个一个对比,而且每次对比需要长期观察,时间太久。
> > > (1)slot group分到不同slot group,然后降低整体并发。
> > > 如果问题是tm的task之间连接对太多导致,这个可能会有效。比如A=>B的120并发,改成60并发的A=>B,让AB处于不同slot
> > > group。
> > >          从连接对角度好像降低了复杂度,但单容器流量可能会增大(因为大流量的算子可能集中某60个容器,另外60个容器流量变小可能)。
> > >          其次,如果将任务分为3个slot
> > > group,按60并发,则实际需180个容器。180*每个容器发现故障的概率,一定程度上增大了因容器故障导致的作业重启问题。
> > >          注意如上的连接对我理解其实也是“逻辑概念”,因为tm之间tcp连接是共享的?所以实际‘连接对’也只是
> > > resultSubPartition 的数量问题。
> > >
> > >
> (2)taskmanager.network.netty.num-arenas、taskmanager.network.netty.server.numThreads、taskmanager.network.netty.client.numThreads
> > > 这三参数是否有影响呢,由于tm的slot为1,这3参数默认都是slot数,也就是1。
> > >
> > >
> > >
> > >
> 此外,还有一个问题,并发大了是否会影响任务启动过程速度。我今天启动120并发的任务,task从deploying变成initializing再到running是一点点变的,差不多1-2min才全部变成running。
> > >
>

Re: 关于LocalTransportException的优化方向咨询

Posted by yidan zhao <hi...@gmail.com>.
hi,我们继续实验了reuse=false,max-num-tcp-connections继续为1的情况,也解决了问题。因此是reuse的问题。

yidan zhao <hi...@gmail.com> 于2022年11月21日周一 10:34写道:
>
> 我们这边其实是生产环境一直有类似local/remote transport异常,其中一大类就是 Sending the partition
> request to '...' failed 这种错误。
>
> 当前是 1.15.2,默认 taskmanager.network.max-num-tcp-connections 是 1,
> taskmanager.network.tcp-connection.enable-reuse-across-jobs 是 true。
>
> 当前测试的实际是将taskmanager.network.max-num-tcp-connections
> 改成了30,taskmanager.network.tcp-connection.enable-reuse-across-jobs 改成了
> false,然后目前没出 Sending the partition request to '...' failed 这种错误。之前很频繁。
>
> 我们集群是standalone,每个tm只提供一个slot,单作业大概120个slot左右。
> 之前的表现还有个规律:作业失败重启连续失败,很难自己重启成功。
> 需要人工重启整个集群后,作业自动重启基本就能成功。然后运行个几天后就很容易再出现这个问题。
>
> —— 关于这2个参数,是哪个有效导致,我们后续试着看看。
>
> >
> > Hi yidan,
> >
> > 感谢你的反馈,从之前的描述来看,在修改 taskmanager.network.max-num-tcp-connections 之前,你们任务可能一天报很多次 LocalTransportException。修改之后,有效果。所以有可能是 taskmanager.network.max-num-tcp-connections 导致的。
> >
> > 但我跟进代码后,发现 taskmanager.network.max-num-tcp-connections 与 taskmanager.network.tcp-connection.enable-reuse-across-jobs 两个参数会一起作用。
> >
> > 可以帮忙验证下:taskmanager.network.max-num-tcp-connections 不变,仍然是 默认值 1 ,但是将 taskmanager.network.tcp-connection.enable-reuse-across-jobs 设置为 false 吗?看一下这么配置稳定性是否有改善。
> >
> > 通过这样验证,可以方便定位到底是 taskmanager.network.max-num-tcp-connections 还是 taskmanager.network.tcp-connection.enable-reuse-across-jobs 影响了任务稳定性。
> >
> > 非常期待你的反馈,谢谢。
> >
> > fanrui
> >
> > On 2022/11/18 04:25:47 yidan zhao wrote:
> > > Hi, weijie guo.
> > > 你在 https://issues.apache.org/jira/browse/FLINK-28695
> > > 中提到的解决方案,我们这边实验观察看起来是有效的。 当然我们任务重启的原因大概分几类,其中一类就是这个问题,目前来看修改参数
> > > taskmanager.network.max-num-tcp-connections 是有效的。
> > >
> > > 但是话说这个重用连接为什么会出现这种问题呢?而且这种问题不是出现1次,而是连续重启很多次后才可能恢复。
> > >
> > > >
> > > > 1.all to all的边的话,你这个例子把并发降下来肯定连接数要少很多的。 slot
> > > > sharing的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
> > > > 2.指的是作业jar,每个TM只会下载一次
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > yidan zhao <hi...@gmail.com> 于2022年10月31日周一 19:54写道:
> > > >
> > > > > 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> > > > > failed;这种异常的概率。
> > > > > 问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
> > > > > 任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。
> > > > >
> > > > > weijie guo <gu...@gmail.com> 于2022年10月31日周一 12:54写道:
> > > > > >
> > > > > > 你好,请问使用的flink版本是多少?
> > > > > > 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> > > > > >
> > > > > 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> > > > > > restore慢等
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Weijie
> > > > > >
> > > > > >
> > > > > > yidan zhao <hi...@gmail.com> 于2022年10月30日周日 11:36写道:
> > > > > >
> > > > > > > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > > > > > > .network.netty.exception.LocalTransportException
> > > > > > > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > > > > > > (1)Sending the partition request to '...' failed;
> > > > > > > org.apache.flink.runtime.io
> > > > > > > .network.netty.exception.LocalTransportException:
> > > > > > > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > > > > > > at org.apache.flink.runtime.io
> > > > > > >
> > > > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > > > > > at org.apache.flink.runtime.io
> > > > > > >
> > > > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > > > > > at java.lang.Thread.run(Thread.java:748)
> > > > > > > Caused by:
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > > > > > > ChannelPromise)(Unknown Source)
> > > > > > > Caused by:
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > > > > > writeAddress(..) failed: Connection timed out
> > > > > > >
> > > > > > > (2)readAddress(..) failed: Connection timed out
> > > > > > > org.apache.flink.runtime.io
> > > > > > > .network.netty.exception.LocalTransportException:
> > > > > > > readAddress(..) failed: Connection timed out (connection to
> > > > > > > '10.35.109.149/10.35.109.149:2094')
> > > > > > > at org.apache.flink.runtime.io
> > > > > > >
> > > > > .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:168)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > > > > > at
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > > > > > at java.lang.Thread.run(Thread.java:748)
> > > > > > > Caused by:
> > > > > > >
> > > > > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > > > > > readAddress(..) failed: Connection timed out
> > > > > > >
> > > > > > > 集群信息:standalone 集群,上百台机器。每台都是自研容器,10g内存左右,每个TM提供1个slot,避免任务之间影响。
> > > > > > >
> > > > > > >
> > > > > 作业信息:作业数小于10个。复杂的单任务大概用120并发执行,流量入口16w/s左右,数据处理、5min平滑window处理(目前偶尔buzz在window处)、redis
> > > > > > > 异步算子等。
> > > > > > >
> > > > > > >
> > > > > 作业状态:反压目前倒是不严重,基本很少反压了。但经常因为出现如上的连接相关问题,自研容器虽然存在故障重启等可能性,但经过调整当前容器本身出问题概率多天才1次,可是任务可能一天好多次报LocalTransportException异常。
> > > > > > >
> > > > > > >
> > > > > 此外,按照之前的经验,LocalTransportException的出现概率和我的集群运行时间也有关,长期运行大概几天后就会加大概率,然后整体重启后会好转。
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > 当前在考虑优化可能方向,大佬们看看有啥建议,实际因为做实验对比周期太长,最好有一定倾向,否则控制影响因素一个一个对比,而且每次对比需要长期观察,时间太久。
> > > > > > > (1)slot group分到不同slot group,然后降低整体并发。
> > > > > > > 如果问题是tm的task之间连接对太多导致,这个可能会有效。比如A=>B的120并发,改成60并发的A=>B,让AB处于不同slot
> > > > > > > group。
> > > > > > >          从连接对角度好像降低了复杂度,但单容器流量可能会增大(因为大流量的算子可能集中某60个容器,另外60个容器流量变小可能)。
> > > > > > >          其次,如果将任务分为3个slot
> > > > > > > group,按60并发,则实际需180个容器。180*每个容器发现故障的概率,一定程度上增大了因容器故障导致的作业重启问题。
> > > > > > >          注意如上的连接对我理解其实也是“逻辑概念”,因为tm之间tcp连接是共享的?所以实际‘连接对’也只是
> > > > > > > resultSubPartition 的数量问题。
> > > > > > >
> > > > > > >
> > > > > (2)taskmanager.network.netty.num-arenas、taskmanager.network.netty.server.numThreads、taskmanager.network.netty.client.numThreads
> > > > > > > 这三参数是否有影响呢,由于tm的slot为1,这3参数默认都是slot数,也就是1。
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > 此外,还有一个问题,并发大了是否会影响任务启动过程速度。我今天启动120并发的任务,task从deploying变成initializing再到running是一点点变的,差不多1-2min才全部变成running。
> > > > > > >
> > > > >
> > >

Re: 关于LocalTransportException的优化方向咨询

Posted by yidan zhao <hi...@gmail.com>.
我们这边其实是生产环境一直有类似local/remote transport异常,其中一大类就是 Sending the partition
request to '...' failed 这种错误。

当前是 1.15.2,默认 taskmanager.network.max-num-tcp-connections 是 1,
taskmanager.network.tcp-connection.enable-reuse-across-jobs 是 true。

当前测试的实际是将taskmanager.network.max-num-tcp-connections
改成了30,taskmanager.network.tcp-connection.enable-reuse-across-jobs 改成了
false,然后目前没出 Sending the partition request to '...' failed 这种错误。之前很频繁。

我们集群是standalone,每个tm只提供一个slot,单作业大概120个slot左右。
之前的表现还有个规律:作业失败重启连续失败,很难自己重启成功。
需要人工重启整个集群后,作业自动重启基本就能成功。然后运行个几天后就很容易再出现这个问题。

—— 关于这2个参数,是哪个有效导致,我们后续试着看看。

>
> Hi yidan,
>
> 感谢你的反馈,从之前的描述来看,在修改 taskmanager.network.max-num-tcp-connections 之前,你们任务可能一天报很多次 LocalTransportException。修改之后,有效果。所以有可能是 taskmanager.network.max-num-tcp-connections 导致的。
>
> 但我跟进代码后,发现 taskmanager.network.max-num-tcp-connections 与 taskmanager.network.tcp-connection.enable-reuse-across-jobs 两个参数会一起作用。
>
> 可以帮忙验证下:taskmanager.network.max-num-tcp-connections 不变,仍然是 默认值 1 ,但是将 taskmanager.network.tcp-connection.enable-reuse-across-jobs 设置为 false 吗?看一下这么配置稳定性是否有改善。
>
> 通过这样验证,可以方便定位到底是 taskmanager.network.max-num-tcp-connections 还是 taskmanager.network.tcp-connection.enable-reuse-across-jobs 影响了任务稳定性。
>
> 非常期待你的反馈,谢谢。
>
> fanrui
>
> On 2022/11/18 04:25:47 yidan zhao wrote:
> > Hi, weijie guo.
> > 你在 https://issues.apache.org/jira/browse/FLINK-28695
> > 中提到的解决方案,我们这边实验观察看起来是有效的。 当然我们任务重启的原因大概分几类,其中一类就是这个问题,目前来看修改参数
> > taskmanager.network.max-num-tcp-connections 是有效的。
> >
> > 但是话说这个重用连接为什么会出现这种问题呢?而且这种问题不是出现1次,而是连续重启很多次后才可能恢复。
> >
> > >
> > > 1.all to all的边的话,你这个例子把并发降下来肯定连接数要少很多的。 slot
> > > sharing的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
> > > 2.指的是作业jar,每个TM只会下载一次
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > yidan zhao <hi...@gmail.com> 于2022年10月31日周一 19:54写道:
> > >
> > > > 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> > > > failed;这种异常的概率。
> > > > 问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
> > > > 任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。
> > > >
> > > > weijie guo <gu...@gmail.com> 于2022年10月31日周一 12:54写道:
> > > > >
> > > > > 你好,请问使用的flink版本是多少?
> > > > > 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> > > > >
> > > > 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> > > > > restore慢等
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Weijie
> > > > >
> > > > >
> > > > > yidan zhao <hi...@gmail.com> 于2022年10月30日周日 11:36写道:
> > > > >
> > > > > > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > > > > > .network.netty.exception.LocalTransportException
> > > > > > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > > > > > (1)Sending the partition request to '...' failed;
> > > > > > org.apache.flink.runtime.io
> > > > > > .network.netty.exception.LocalTransportException:
> > > > > > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > > > > > at org.apache.flink.runtime.io
> > > > > >
> > > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > > > > at org.apache.flink.runtime.io
> > > > > >
> > > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > > > > at java.lang.Thread.run(Thread.java:748)
> > > > > > Caused by:
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > > > > > ChannelPromise)(Unknown Source)
> > > > > > Caused by:
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > > > > writeAddress(..) failed: Connection timed out
> > > > > >
> > > > > > (2)readAddress(..) failed: Connection timed out
> > > > > > org.apache.flink.runtime.io
> > > > > > .network.netty.exception.LocalTransportException:
> > > > > > readAddress(..) failed: Connection timed out (connection to
> > > > > > '10.35.109.149/10.35.109.149:2094')
> > > > > > at org.apache.flink.runtime.io
> > > > > >
> > > > .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:168)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > > > > at
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > > > > at java.lang.Thread.run(Thread.java:748)
> > > > > > Caused by:
> > > > > >
> > > > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > > > > readAddress(..) failed: Connection timed out
> > > > > >
> > > > > > 集群信息:standalone 集群,上百台机器。每台都是自研容器,10g内存左右,每个TM提供1个slot,避免任务之间影响。
> > > > > >
> > > > > >
> > > > 作业信息:作业数小于10个。复杂的单任务大概用120并发执行,流量入口16w/s左右,数据处理、5min平滑window处理(目前偶尔buzz在window处)、redis
> > > > > > 异步算子等。
> > > > > >
> > > > > >
> > > > 作业状态:反压目前倒是不严重,基本很少反压了。但经常因为出现如上的连接相关问题,自研容器虽然存在故障重启等可能性,但经过调整当前容器本身出问题概率多天才1次,可是任务可能一天好多次报LocalTransportException异常。
> > > > > >
> > > > > >
> > > > 此外,按照之前的经验,LocalTransportException的出现概率和我的集群运行时间也有关,长期运行大概几天后就会加大概率,然后整体重启后会好转。
> > > > > >
> > > > > >
> > > > > >
> > > > 当前在考虑优化可能方向,大佬们看看有啥建议,实际因为做实验对比周期太长,最好有一定倾向,否则控制影响因素一个一个对比,而且每次对比需要长期观察,时间太久。
> > > > > > (1)slot group分到不同slot group,然后降低整体并发。
> > > > > > 如果问题是tm的task之间连接对太多导致,这个可能会有效。比如A=>B的120并发,改成60并发的A=>B,让AB处于不同slot
> > > > > > group。
> > > > > >          从连接对角度好像降低了复杂度,但单容器流量可能会增大(因为大流量的算子可能集中某60个容器,另外60个容器流量变小可能)。
> > > > > >          其次,如果将任务分为3个slot
> > > > > > group,按60并发,则实际需180个容器。180*每个容器发现故障的概率,一定程度上增大了因容器故障导致的作业重启问题。
> > > > > >          注意如上的连接对我理解其实也是“逻辑概念”,因为tm之间tcp连接是共享的?所以实际‘连接对’也只是
> > > > > > resultSubPartition 的数量问题。
> > > > > >
> > > > > >
> > > > (2)taskmanager.network.netty.num-arenas、taskmanager.network.netty.server.numThreads、taskmanager.network.netty.client.numThreads
> > > > > > 这三参数是否有影响呢,由于tm的slot为1,这3参数默认都是slot数,也就是1。
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > 此外,还有一个问题,并发大了是否会影响任务启动过程速度。我今天启动120并发的任务,task从deploying变成initializing再到running是一点点变的,差不多1-2min才全部变成running。
> > > > > >
> > > >
> >

Re: 关于LocalTransportException的优化方向咨询

Posted by Rui Fan <fa...@apache.org>.
Hi yidan,

感谢你的反馈,从之前的描述来看,在修改 taskmanager.network.max-num-tcp-connections 之前,你们任务可能一天报很多次 LocalTransportException。修改之后,有效果。所以有可能是 taskmanager.network.max-num-tcp-connections 导致的。

但我跟进代码后,发现 taskmanager.network.max-num-tcp-connections 与 taskmanager.network.tcp-connection.enable-reuse-across-jobs 两个参数会一起作用。

可以帮忙验证下:taskmanager.network.max-num-tcp-connections 不变,仍然是 默认值 1 ,但是将 taskmanager.network.tcp-connection.enable-reuse-across-jobs 设置为 false 吗?看一下这么配置稳定性是否有改善。

通过这样验证,可以方便定位到底是 taskmanager.network.max-num-tcp-connections 还是 taskmanager.network.tcp-connection.enable-reuse-across-jobs 影响了任务稳定性。

非常期待你的反馈,谢谢。

fanrui

On 2022/11/18 04:25:47 yidan zhao wrote:
> Hi, weijie guo.
> 你在 https://issues.apache.org/jira/browse/FLINK-28695
> 中提到的解决方案,我们这边实验观察看起来是有效的。 当然我们任务重启的原因大概分几类,其中一类就是这个问题,目前来看修改参数
> taskmanager.network.max-num-tcp-connections 是有效的。
> 
> 但是话说这个重用连接为什么会出现这种问题呢?而且这种问题不是出现1次,而是连续重启很多次后才可能恢复。
> 
> >
> > 1.all to all的边的话,你这个例子把并发降下来肯定连接数要少很多的。 slot
> > sharing的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
> > 2.指的是作业jar,每个TM只会下载一次
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > yidan zhao <hi...@gmail.com> 于2022年10月31日周一 19:54写道:
> >
> > > 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> > > failed;这种异常的概率。
> > > 问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
> > > 任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。
> > >
> > > weijie guo <gu...@gmail.com> 于2022年10月31日周一 12:54写道:
> > > >
> > > > 你好,请问使用的flink版本是多少?
> > > > 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> > > >
> > > 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> > > > restore慢等
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > yidan zhao <hi...@gmail.com> 于2022年10月30日周日 11:36写道:
> > > >
> > > > > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > > > > .network.netty.exception.LocalTransportException
> > > > > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > > > > (1)Sending the partition request to '...' failed;
> > > > > org.apache.flink.runtime.io
> > > > > .network.netty.exception.LocalTransportException:
> > > > > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > > > > at org.apache.flink.runtime.io
> > > > >
> > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > > > at org.apache.flink.runtime.io
> > > > >
> > > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > > > at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by:
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > > > > ChannelPromise)(Unknown Source)
> > > > > Caused by:
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > > > writeAddress(..) failed: Connection timed out
> > > > >
> > > > > (2)readAddress(..) failed: Connection timed out
> > > > > org.apache.flink.runtime.io
> > > > > .network.netty.exception.LocalTransportException:
> > > > > readAddress(..) failed: Connection timed out (connection to
> > > > > '10.35.109.149/10.35.109.149:2094')
> > > > > at org.apache.flink.runtime.io
> > > > >
> > > .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:168)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > > > at
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > > > at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by:
> > > > >
> > > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > > > readAddress(..) failed: Connection timed out
> > > > >
> > > > > 集群信息:standalone 集群,上百台机器。每台都是自研容器,10g内存左右,每个TM提供1个slot,避免任务之间影响。
> > > > >
> > > > >
> > > 作业信息:作业数小于10个。复杂的单任务大概用120并发执行,流量入口16w/s左右,数据处理、5min平滑window处理(目前偶尔buzz在window处)、redis
> > > > > 异步算子等。
> > > > >
> > > > >
> > > 作业状态:反压目前倒是不严重,基本很少反压了。但经常因为出现如上的连接相关问题,自研容器虽然存在故障重启等可能性,但经过调整当前容器本身出问题概率多天才1次,可是任务可能一天好多次报LocalTransportException异常。
> > > > >
> > > > >
> > > 此外,按照之前的经验,LocalTransportException的出现概率和我的集群运行时间也有关,长期运行大概几天后就会加大概率,然后整体重启后会好转。
> > > > >
> > > > >
> > > > >
> > > 当前在考虑优化可能方向,大佬们看看有啥建议,实际因为做实验对比周期太长,最好有一定倾向,否则控制影响因素一个一个对比,而且每次对比需要长期观察,时间太久。
> > > > > (1)slot group分到不同slot group,然后降低整体并发。
> > > > > 如果问题是tm的task之间连接对太多导致,这个可能会有效。比如A=>B的120并发,改成60并发的A=>B,让AB处于不同slot
> > > > > group。
> > > > >          从连接对角度好像降低了复杂度,但单容器流量可能会增大(因为大流量的算子可能集中某60个容器,另外60个容器流量变小可能)。
> > > > >          其次,如果将任务分为3个slot
> > > > > group,按60并发,则实际需180个容器。180*每个容器发现故障的概率,一定程度上增大了因容器故障导致的作业重启问题。
> > > > >          注意如上的连接对我理解其实也是“逻辑概念”,因为tm之间tcp连接是共享的?所以实际‘连接对’也只是
> > > > > resultSubPartition 的数量问题。
> > > > >
> > > > >
> > > (2)taskmanager.network.netty.num-arenas、taskmanager.network.netty.server.numThreads、taskmanager.network.netty.client.numThreads
> > > > > 这三参数是否有影响呢,由于tm的slot为1,这3参数默认都是slot数,也就是1。
> > > > >
> > > > >
> > > > >
> > > > >
> > > 此外,还有一个问题,并发大了是否会影响任务启动过程速度。我今天启动120并发的任务,task从deploying变成initializing再到running是一点点变的,差不多1-2min才全部变成running。
> > > > >
> > >
> 

Re: 关于LocalTransportException的优化方向咨询

Posted by yidan zhao <hi...@gmail.com>.
Hi, weijie guo.
你在 https://issues.apache.org/jira/browse/FLINK-28695
中提到的解决方案,我们这边实验观察看起来是有效的。 当然我们任务重启的原因大概分几类,其中一类就是这个问题,目前来看修改参数
taskmanager.network.max-num-tcp-connections 是有效的。

但是话说这个重用连接为什么会出现这种问题呢?而且这种问题不是出现1次,而是连续重启很多次后才可能恢复。

>
> 1.all to all的边的话,你这个例子把并发降下来肯定连接数要少很多的。 slot
> sharing的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
> 2.指的是作业jar,每个TM只会下载一次
>
> Best regards,
>
> Weijie
>
>
> yidan zhao <hi...@gmail.com> 于2022年10月31日周一 19:54写道:
>
> > 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> > failed;这种异常的概率。
> > 问题2,你提到的下载jar是指任务jar还是flink的jar。flink的jar不需要,因为我是standalone集群。
> > 任务jar的话,这出现另外一个问题,如果一个TM分配到120*10=1200个task,那么任务jar不会分发这么多次吧。
> >
> > weijie guo <gu...@gmail.com> 于2022年10月31日周一 12:54写道:
> > >
> > > 你好,请问使用的flink版本是多少?
> > > 1.15的话TM间是有connection reuse的,默认TM间建立一个物理TCP连接。
> > >
> > 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> > > restore慢等
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > yidan zhao <hi...@gmail.com> 于2022年10月30日周日 11:36写道:
> > >
> > > > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > > > .network.netty.exception.LocalTransportException
> > > > 异常,具体异常cause有如下情况,按照出现频率从高到底列举。
> > > > (1)Sending the partition request to '...' failed;
> > > > org.apache.flink.runtime.io
> > > > .network.netty.exception.LocalTransportException:
> > > > Sending the partition request to '/10.35.215.18:2065 (#0)' failed.
> > > > at org.apache.flink.runtime.io
> > > >
> > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:145)
> > > > at org.apache.flink.runtime.io
> > > >
> > .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:133)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:1017)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:878)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > > at java.lang.Thread.run(Thread.java:748)
> > > > Caused by:
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.StacklessClosedChannelException
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
> > > > ChannelPromise)(Unknown Source)
> > > > Caused by:
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > > writeAddress(..) failed: Connection timed out
> > > >
> > > > (2)readAddress(..) failed: Connection timed out
> > > > org.apache.flink.runtime.io
> > > > .network.netty.exception.LocalTransportException:
> > > > readAddress(..) failed: Connection timed out (connection to
> > > > '10.35.109.149/10.35.109.149:2094')
> > > > at org.apache.flink.runtime.io
> > > >
> > .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:168)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> > > > at
> > > >
> > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> > > > at java.lang.Thread.run(Thread.java:748)
> > > > Caused by:
> > > >
> > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> > > > readAddress(..) failed: Connection timed out
> > > >
> > > > 集群信息:standalone 集群,上百台机器。每台都是自研容器,10g内存左右,每个TM提供1个slot,避免任务之间影响。
> > > >
> > > >
> > 作业信息:作业数小于10个。复杂的单任务大概用120并发执行,流量入口16w/s左右,数据处理、5min平滑window处理(目前偶尔buzz在window处)、redis
> > > > 异步算子等。
> > > >
> > > >
> > 作业状态:反压目前倒是不严重,基本很少反压了。但经常因为出现如上的连接相关问题,自研容器虽然存在故障重启等可能性,但经过调整当前容器本身出问题概率多天才1次,可是任务可能一天好多次报LocalTransportException异常。
> > > >
> > > >
> > 此外,按照之前的经验,LocalTransportException的出现概率和我的集群运行时间也有关,长期运行大概几天后就会加大概率,然后整体重启后会好转。
> > > >
> > > >
> > > >
> > 当前在考虑优化可能方向,大佬们看看有啥建议,实际因为做实验对比周期太长,最好有一定倾向,否则控制影响因素一个一个对比,而且每次对比需要长期观察,时间太久。
> > > > (1)slot group分到不同slot group,然后降低整体并发。
> > > > 如果问题是tm的task之间连接对太多导致,这个可能会有效。比如A=>B的120并发,改成60并发的A=>B,让AB处于不同slot
> > > > group。
> > > >          从连接对角度好像降低了复杂度,但单容器流量可能会增大(因为大流量的算子可能集中某60个容器,另外60个容器流量变小可能)。
> > > >          其次,如果将任务分为3个slot
> > > > group,按60并发,则实际需180个容器。180*每个容器发现故障的概率,一定程度上增大了因容器故障导致的作业重启问题。
> > > >          注意如上的连接对我理解其实也是“逻辑概念”,因为tm之间tcp连接是共享的?所以实际‘连接对’也只是
> > > > resultSubPartition 的数量问题。
> > > >
> > > >
> > (2)taskmanager.network.netty.num-arenas、taskmanager.network.netty.server.numThreads、taskmanager.network.netty.client.numThreads
> > > > 这三参数是否有影响呢,由于tm的slot为1,这3参数默认都是slot数,也就是1。
> > > >
> > > >
> > > >
> > > >
> > 此外,还有一个问题,并发大了是否会影响任务启动过程速度。我今天启动120并发的任务,task从deploying变成initializing再到running是一点点变的,差不多1-2min才全部变成running。
> > > >
> >