You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Jacob <17...@163.com> on 2021/06/03 07:27:43 UTC

Flink checkpoint 速度很慢 问题排查

Dear all,

我有一个两个Flink Job A和B

A job任务是消费kafka topic01数据,经过一系列逻辑加工,最终将数据sink到topic02

其中加工大致过程是:消费到topic01消息后,根据数据相关字段查询redis、查询hbase,然后组装业务数据(过程比较复杂),然后将业务数据写到另一个topic02,30s做一次checkpoint,state大小只有几十kb,但做一次checkpoint平均需要两分钟,导致topic01消息产生堆积,实时性降低。

B job任务简单,消费上一步的的业务数据topic02,开一个半个小时的窗口将数据进行聚合(keyby、max)之后写到orc
file,state大小几百兆,但耗时是秒级别。

我比较疑惑的是为什么A job的state那么小,但checkpoint却很耗时,不知道从哪个角度去优化该问题。

请各位指教



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by HunterXHunter <13...@qq.com>.
一般是 Job A出现背压了,checkpoint的时候是要等背压的数据都处理完了才会处理barrier。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink checkpoint 速度很慢 问题排查

Posted by Jacob <17...@163.com>.
@lian 谢谢回复

我通过webui查询,没有背压的情况。
hbase性能这块确实存在一定问题,公司的hbase性能一直不佳,但我的程序中,是先从缓存redis中取,大部分数据都能查到,只有少部分会查hbase。
谢谢你提供的思路,我将会从代码级别考虑,看是否能优化相关逻辑



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink checkpoint 速度很慢 问题排查

Posted by Jacob <17...@163.com>.
@lian 谢谢回复

我通过webui查询,没有背压的情况。
hbase性能这块确实存在一定问题,公司的hbase性能一直不佳,但我的程序中,是先从缓存redis中取,大部分数据都能查到,只有少部分会查hbase。
谢谢你提供的思路,我将会从代码级别考虑,看是否能优化相关逻辑



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:Flink checkpoint 速度很慢 问题排查

Posted by lian <lz...@126.com>.
排查一下任务在执行过程中,是否有背压,以及在ck过程中,buffer积压了多少数据量。
很可能是在访问hbase的过程,性能不是很好。


在2021年06月03日 15:27,Jacob 写道:
Dear all,

我有一个两个Flink Job A和B

A job任务是消费kafka topic01数据,经过一系列逻辑加工,最终将数据sink到topic02

其中加工大致过程是:消费到topic01消息后,根据数据相关字段查询redis、查询hbase,然后组装业务数据(过程比较复杂),然后将业务数据写到另一个topic02,30s做一次checkpoint,state大小只有几十kb,但做一次checkpoint平均需要两分钟,导致topic01消息产生堆积,实时性降低。

B job任务简单,消费上一步的的业务数据topic02,开一个半个小时的窗口将数据进行聚合(keyby、max)之后写到orc
file,state大小几百兆,但耗时是秒级别。

我比较疑惑的是为什么A job的state那么小,但checkpoint却很耗时,不知道从哪个角度去优化该问题。

请各位指教



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by Jacob <17...@163.com>.
嗯嗯
明白了,感谢大神最近的指导!



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by yidan zhao <hi...@gmail.com>.
不是的哈,那个方法本身还是同步调用的。就是需要你自己保证逻辑的异步。

Jacob <17...@163.com> 于2021年6月8日周二 上午9:31写道:
>
> @nobleyd
> 谢谢大神指导,前两天休息没看邮件,才回复,抱歉
>
> 我后面把代码大概改成如下样子,checkpoint时间确实得到了改善,job运行几天正常
>
> 我提供的线程池在asyncInvoke方法内部跑,这样是不是不合适呀,asyncInvoke方法本身是不是就是封装好的异步方法,就不用单独启线程池了吧?直接在asyncInvoke方法内部写处理逻辑就好。
>
> public class AsyncProcessFunction extends RichAsyncFunction<Map&lt;String,
> String>, List<JSONObject>> {
>
>      private transient ExecutorService executorpool;
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>       executorpool= Executors.newFixedThreadPool(80);
>     }
>
>     @Override
>     public void asyncInvoke(Map<String, String> message,
> ResultFuture<List&lt;JSONObject>> resultFuture){
>         executorpool.submit(()->{
>                   // 处理逻辑
>                       ..............
>               resultFuture.complete(Collections.singletonList(...));
>         });
>     }
> }
>
>
>
> -----
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by Jacob <17...@163.com>.
@nobleyd
谢谢大神指导,前两天休息没看邮件,才回复,抱歉

我后面把代码大概改成如下样子,checkpoint时间确实得到了改善,job运行几天正常

我提供的线程池在asyncInvoke方法内部跑,这样是不是不合适呀,asyncInvoke方法本身是不是就是封装好的异步方法,就不用单独启线程池了吧?直接在asyncInvoke方法内部写处理逻辑就好。

public class AsyncProcessFunction extends RichAsyncFunction<Map&lt;String,
String>, List<JSONObject>> {

     private transient ExecutorService executorpool;

    @Override
    public void open(Configuration parameters) throws Exception {
      executorpool= Executors.newFixedThreadPool(80);
    }

    @Override
    public void asyncInvoke(Map<String, String> message,
ResultFuture<List&lt;JSONObject>> resultFuture){
        executorpool.submit(()->{
                  // 处理逻辑
                      ..............
              resultFuture.complete(Collections.singletonList(...));
        });
    }
}



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by yidan zhao <hi...@gmail.com>.
可以的,本身异步操作的本质就是线程池。 至于是你自己提供线程池,去执行某个同步操作。还是直接使用client/sdk等封装的异步方法内部默认的线程池这个无所谓。

Jacob <17...@163.com> 于2021年6月5日周六 下午1:15写道:
>
> thanks,
>
> 我查看了相关文档[1] 由于redis以及hbase的交互地方比较多,比较零散,不光是查询,还有回写redis
>
> 我打算把之前map算子的整段逻辑以线程池的形式丢在asyncInvoke()方法内部,不知道合适与否,这样数据的顺序性就无法得到保障了吧?
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/>
>
>
>
> -----
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by Jacob <17...@163.com>.
thanks,

我查看了相关文档[1] 由于redis以及hbase的交互地方比较多,比较零散,不光是查询,还有回写redis

我打算把之前map算子的整段逻辑以线程池的形式丢在asyncInvoke()方法内部,不知道合适与否,这样数据的顺序性就无法得到保障了吧?



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/>  



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by yidan zhao <hi...@gmail.com>.
官方就有文档。其实本质就是一个异步操作假设1ms,那么同步操作的1s也就能1000个操作,qps太低了。异步的话可以大大提高qps。

Jacob <17...@163.com> 于2021年6月4日周五 下午5:58写道:
>
> 嗯嗯 你的描述是对的,job的执行过程大致就是如此
>
>
> 我明白你意思了
>
> 谢谢你提供的思路,我需要学习一下这个异步算子,之前从未接触过,不太清楚这具体是一个怎样的流程,请问你那边有相关的demo吗,或者该去具体去看哪部分的内容?
>
>
>
>
>
> -----
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by Jacob <17...@163.com>.
嗯嗯 你的描述是对的,job的执行过程大致就是如此


我明白你意思了

谢谢你提供的思路,我需要学习一下这个异步算子,之前从未接触过,不太清楚这具体是一个怎样的流程,请问你那边有相关的demo吗,或者该去具体去看哪部分的内容?





-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by yidan zhao <hi...@gmail.com>.
我懂你意思,每个输入数据,经过redis、hbase等访问,以及相关调整(比如字段设置等),然后这个记录需要继续作为此算子的输出是吧。

我表达的是指你需要用异步访问redis、hbase方式,这个配合flink的异步算子去实现。所以你说的那个需求基于异步的是可以满足的。

Jacob <17...@163.com> 于2021年6月4日周五 下午3:21写道:
>
> @nobleyd 谢谢回复
>
> 你任务A中的redis和hbase是异步还是同步访问,------------------- 同步
>
> 你估计用的是对齐检查点是吧? -------------------是的
>
>
> 同步访问,是因为我们要及时生成新数据,换做异步就无法即时拿到最新的结果数据了
>
> 检查点我刚调整为非对齐方式了,从做完的十个checkpoint来看,state大小确实增加了,但速度尚未变快
>
>
> 消息量确实比较大,处理逻辑也较为复杂,处理逻辑算子的并行度我给了100,source并行度等于topic分区数
>
>
>
> -----
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by Jacob <17...@163.com>.
@nobleyd 谢谢回复

你任务A中的redis和hbase是异步还是同步访问,------------------- 同步

你估计用的是对齐检查点是吧? -------------------是的


同步访问,是因为我们要及时生成新数据,换做异步就无法即时拿到最新的结果数据了

检查点我刚调整为非对齐方式了,从做完的十个checkpoint来看,state大小确实增加了,但速度尚未变快


消息量确实比较大,处理逻辑也较为复杂,处理逻辑算子的并行度我给了100,source并行度等于topic分区数



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by yidan zhao <hi...@gmail.com>.
你任务A中的redis和hbase是异步还是同步访问,同步是肯定不行的。ckpt小是因为没啥状态,自然就小,时间长可能是数据对齐时间长,你估计用的是对齐检查点是吧?
 如果换成非对齐检查点,时间应该能降下来,但是状态会变得很大,你可以试试。
最佳做法是,改造成异步的,不能同步。

JasonLee <17...@163.com> 于2021年6月4日周五 上午10:57写道:
>
> hi
>
> source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask
> 空跑,浪费资源,你只需要把 map 的并行度调大即可.
>
>
>
> -----
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by JasonLee <17...@163.com>.
hi

source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask
空跑,浪费资源,你只需要把 map 的并行度调大即可.



-----
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by Jacob <17...@163.com>.
@JasonLee 谢谢回复

A job 的背压情况如下图

<http://apache-flink.147419.n8.nabble.com/file/t1162/backpressure.png> 

我清楚job处理数据速度的确赶不上消息的生产速度这一事实,但暂时想不到一个合理的解决办法,并行度都已经设置的比较大了(从等于topic分区数量已经调整为大于partition数量了)。

我把各个task的并行度设置是一样的,让他们链在一个task上,从而优化线程切换的性能
其中 Map算子是最耗时的,所有的逻辑和数据加工都在这个Map算子,前后两个Flat Map
都是简单的将List数据扁平化而已,没有什么耗时操作。开始它们的并行度我设25(topic01
partition数量、消息速率是:1000~2000条/s),后面直接改成80,但并没有明显效果。





-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

Posted by JasonLee <17...@163.com>.
hi

你理解的可能有点偏差,应该是因为任务出现了反压或者数据倾斜的问题导致了cp时间长,01消息堆积说明已经反压到source端了,需要先定位反压的位置,看是具体什么原因导致的,然后再根据情况解决.



-----
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/