You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Benchao Li <li...@apache.org> on 2022/01/06 12:43:00 UTC

Re: Re: flink sql回撤流sink优化问题

这个问题可以用mini-batch[1]来解决呀

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation

casel.chen <ca...@126.com> 于2021年12月26日周日 18:01写道:

> 你说的是upsert-kafka的这两个参数吗?
>
> sink.buffer-flush.max-rows
> sink.buffer-flush.interval
> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-12-25 22:54:19,"郭伟权" <gw...@gmail.com> 写道:
>
> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
> >
> >casel.chen <ca...@126.com> 于2021年12月23日周四 08:15写道:
> >
> >> flink sql中aggregate without
> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
> >>
> >>
> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
> >>
> >> orderid.   category    dt                                          amt
> >>
> >> 订单id     商品类型   购买时间(yyyyMMddHH)      购买金额
> >>
> >>
> >>
> >>
> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
> >>
> >>
> >>
> >> INSERT INTO mysql_sink_table
> >>
> >> SELECT category, dt, LAST_VALUE(total)
> >>
> >> OVER (
> >>
> >>   PARTITION BY category
> >>
> >>   ORDER BY PROCTIME()
> >>
> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
> >>
> >> ) AS var1
> >>
> >> FROM (
> >>
> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
> >>
> >> );
>


-- 

Best,
Benchao Li

Re: Re: Re: flink sql回撤流sink优化问题

Posted by Benchao Li <li...@apache.org>.
mini-batch对aggregate算子是有效的,开启了之后它的输出会降低一些,从而降低了sink的输出压力。

casel.chen <ca...@126.com> 于2022年1月7日周五 07:42写道:

> mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-01-06 20:43:00,"Benchao Li" <li...@apache.org> 写道:
> >这个问题可以用mini-batch[1]来解决呀
> >
> >[1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation
> >
> >casel.chen <ca...@126.com> 于2021年12月26日周日 18:01写道:
> >
> >> 你说的是upsert-kafka的这两个参数吗?
> >>
> >> sink.buffer-flush.max-rows
> >> sink.buffer-flush.interval
> >> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
> >> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-12-25 22:54:19,"郭伟权" <gw...@gmail.com> 写道:
> >>
> >>
> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
> >> >
> >> >casel.chen <ca...@126.com> 于2021年12月23日周四 08:15写道:
> >> >
> >> >> flink sql中aggregate without
> >> >>
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> >> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> >> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
> >> >>
> >> >>
> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
> >> >>
> >> >> orderid.   category    dt
> amt
> >> >>
> >> >> 订单id     商品类型   购买时间(yyyyMMddHH)      购买金额
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
> >> >>
> >> >>
> >> >>
> >> >> INSERT INTO mysql_sink_table
> >> >>
> >> >> SELECT category, dt, LAST_VALUE(total)
> >> >>
> >> >> OVER (
> >> >>
> >> >>   PARTITION BY category
> >> >>
> >> >>   ORDER BY PROCTIME()
> >> >>
> >> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
> >> >>
> >> >> ) AS var1
> >> >>
> >> >> FROM (
> >> >>
> >> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category,
> dt
> >> >>
> >> >> );
> >>
> >
> >
> >--
> >
> >Best,
> >Benchao Li
>


-- 

Best,
Benchao Li

Re:Re: Re: flink sql回撤流sink优化问题

Posted by "casel.chen" <ca...@126.com>.
mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。

















在 2022-01-06 20:43:00,"Benchao Li" <li...@apache.org> 写道:
>这个问题可以用mini-batch[1]来解决呀
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation
>
>casel.chen <ca...@126.com> 于2021年12月26日周日 18:01写道:
>
>> 你说的是upsert-kafka的这两个参数吗?
>>
>> sink.buffer-flush.max-rows
>> sink.buffer-flush.interval
>> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
>> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-12-25 22:54:19,"郭伟权" <gw...@gmail.com> 写道:
>>
>> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
>> >
>> >casel.chen <ca...@126.com> 于2021年12月23日周四 08:15写道:
>> >
>> >> flink sql中aggregate without
>> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
>> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
>> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>> >>
>> >>
>> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>> >>
>> >> orderid.   category    dt                                          amt
>> >>
>> >> 订单id     商品类型   购买时间(yyyyMMddHH)      购买金额
>> >>
>> >>
>> >>
>> >>
>> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>> >>
>> >>
>> >>
>> >> INSERT INTO mysql_sink_table
>> >>
>> >> SELECT category, dt, LAST_VALUE(total)
>> >>
>> >> OVER (
>> >>
>> >>   PARTITION BY category
>> >>
>> >>   ORDER BY PROCTIME()
>> >>
>> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>> >>
>> >> ) AS var1
>> >>
>> >> FROM (
>> >>
>> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>> >>
>> >> );
>>
>
>
>-- 
>
>Best,
>Benchao Li