You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Guojun Li <gj...@gmail.com> on 2023/03/02 03:46:22 UTC

Re: Re: Flink SQL 如何优化以及处理反压

可以看一下反压算子是否出现在同一台机器(排除单点故障)。比如使用了 rocksdb + hdd 盘;单机负载过高;磁盘打满等。
如果不是单点故障,可以打 jstack 查看对应的线程具体在执行什么样的操作,再进行相应的逻辑优化。

On Tue, Jan 31, 2023 at 6:01 PM lxk <lx...@163.com> wrote:

> 现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。
> 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-01-31 17:45:15,"weijie guo" <gu...@gmail.com> 写道:
> >最好先找到导致下游处理过慢的瓶颈算子,适当扩大一下并发。如果还不行,看下jstack的情况,可能需要调整逻辑。
> >
> >Best regards,
> >
> >Weijie
> >
> >
> >ssmq <37...@qq.com.invalid> 于2023年1月31日周二 17:22写道:
> >
> >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了
> >>
> >>
> >> 发件人: lxk
> >> 发送时间: 2023年1月31日 15:16
> >> 收件人: user-zh@flink.apache.org
> >> 主题: Flink SQL 如何优化以及处理反压
> >>
> >> Flink版本:1.16.0
> >> 目前在使用Flink SQL进行多流关联,并写入Clickhouse中
> >> 具体代码如下:
> >> select \
> >> header.id as id, \
> >> LAST_VALUE(header.order_status), \
> >> LAST_VALUE(header.customer_id), \
> >> LAST_VALUE(header.shop_id), \
> >> LAST_VALUE(header.parent_order_id), \
> >> LAST_VALUE(header.order_at), \
> >> LAST_VALUE(header.pay_at), \
> >> LAST_VALUE(header.channel_id), \
> >> LAST_VALUE(header.root_order_id), \
> >> LAST_VALUE(header.last_updated_at), \
> >> item.id as item_id, \
> >> LAST_VALUE(item.order_id) as order_id, \
> >> LAST_VALUE(item.row_num), \
> >> LAST_VALUE(item.goods_id), \
> >> LAST_VALUE(item.s_sku_code), \
> >> LAST_VALUE(item.qty), \
> >> LAST_VALUE(item.p_paid_sub_amt), \
> >> LAST_VALUE(item.p_sp_sub_amt), \
> >> LAST_VALUE(item.bom_type), \
> >> LAST_VALUE(item.last_updated_at) as item_last_updated_at, \
> >> LAST_VALUE(item.display_qty), \
> >> LAST_VALUE(delivery.del_type), \
> >> LAST_VALUE(delivery.time_slot_type), \
> >> LAST_VALUE(delivery.time_slot_date), \
> >> LAST_VALUE(delivery.time_slot_time_from), \
> >> LAST_VALUE(delivery.time_slot_time_to), \
> >> LAST_VALUE(delivery.sku_delivery_type), \
> >> LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \
> >> LAST_VALUE(promotion.id) as promo_id, \
> >> LAST_VALUE(promotion.order_item_id), \
> >> LAST_VALUE(promotion.p_promo_amt), \
> >> LAST_VALUE(promotion.promotion_category), \
> >> LAST_VALUE(promotion.promo_type), \
> >> LAST_VALUE(promotion.promo_sub_type), \
> >> LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \
> >> LAST_VALUE(promotion.promotion_cost) \
> >> from \
> >>   item \
> >>   join \
> >>   header  \
> >>   on item.order_id = header.id \
> >>   left join \
> >>   delivery \
> >>   on item.order_id = delivery.order_id \
> >>   left join \
> >>   promotion \
> >>   on item.id =promotion.order_item_id \
> >>   group by header.id,item.id
> >> 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉:
> >> https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg
> >>
> >> 参考了京东的一篇文章
> >>
> https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc
> >> ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。
> >>
> >> conf.setString("table.exec.mini-batch.enabled", "true");
> >> conf.setString("table.exec.mini-batch.allow-latency", "15 s");
> >> conf.setString("table.exec.mini-batch.size", "5000");
> >> conf.setString("table.exec.state.ttl", "86400 s");
> >> conf.setString("table.exec.disabled-operators", "NestedLoopJoin");
> >> conf.setString("table.optimizer.join.broadcast-threshold", "-1");
> >> conf.setString("table.optimizer.multiple-input-enabled", "true");
> >> conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED");
> >> conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8");
> >> 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段?
> >>
> >>
> >>
> >>
>