You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by guomuhua <66...@qq.com> on 2021/03/23 12:02:43 UTC
flink sql count distonct 优化
在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE;
或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
set
table.optimizer.distinct-agg.split.bucket-num=1024;
还需要对应的将SQL改写为两段式吗?
例如:
原SQL:
SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
对所需DISTINCT字段buy_id模1024自动打散后,SQL:
SELECT day, SUM(cnt) total
FROM (
SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
FROM T GROUP BY day, MOD(buy_id, 1024))
GROUP BY day
还是flink会帮我自动改写SQL,我不用关心?
另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
<http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png>
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql count distonct 优化
Posted by Robin Zhang <vi...@outlook.com>.
Hi,Jark
我理解疑问中的sql是一个普通的agg操作,只不过分组的键是时间字段,不知道您说的 `我看你的作业里面是window agg`
,这个怎么理解
Best,
Robin
Jark wrote
>> 如果不是window agg,开启参数后flink会自动打散是吧
> 是的
>
>> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
> 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>
> On Fri, 26 Mar 2021 at 11:00, guomuhua <
> 663021157@
>> wrote:
>
>> Jark wrote
>> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
>> > agg支持这个参数了。可以期待下。
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang <
>>
>> > vincent2015qdlg@
>>
>> > >
>> > wrote:
>> >
>> >> Hi,guomuhua
>> >> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>> >>
>> >> Best,
>> >> Robin
>> >>
>> >>
>> >> guomuhua wrote
>> >> > 在SQL中,如果开启了 local-global 参数:set
>> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
>> >> > 或者开启了Partial-Final 参数:set
>> >> table.optimizer.distinct-agg.split.enabled=true;
>> >> > set
>> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
>> >> > 还需要对应的将SQL改写为两段式吗?
>> >> > 例如:
>> >> > 原SQL:
>> >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>> >> >
>> >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>> >> > SELECT day, SUM(cnt) total
>> >> > FROM (
>> >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>> >> > FROM T GROUP BY day, MOD(buy_id, 1024))
>> >> > GROUP BY day
>> >> >
>> >> > 还是flink会帮我自动改写SQL,我不用关心?
>> >> >
>> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
>> >> > <
>> >>
>> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png>
>> ;
>> >>
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
>> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql count distonct 优化
Posted by Jark Wu <im...@gmail.com>.
> 如果不是window agg,开启参数后flink会自动打散是吧
是的
> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。
Best,
Jark
[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
On Fri, 26 Mar 2021 at 11:00, guomuhua <66...@qq.com> wrote:
> Jark wrote
> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> > agg支持这个参数了。可以期待下。
> >
> > Best,
> > Jark
> >
> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang <
>
> > vincent2015qdlg@
>
> > >
> > wrote:
> >
> >> Hi,guomuhua
> >> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
> >>
> >> Best,
> >> Robin
> >>
> >>
> >> guomuhua wrote
> >> > 在SQL中,如果开启了 local-global 参数:set
> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> >> > 或者开启了Partial-Final 参数:set
> >> table.optimizer.distinct-agg.split.enabled=true;
> >> > set
> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
> >> > 还需要对应的将SQL改写为两段式吗?
> >> > 例如:
> >> > 原SQL:
> >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >> >
> >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> >> > SELECT day, SUM(cnt) total
> >> > FROM (
> >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> >> > FROM T GROUP BY day, MOD(buy_id, 1024))
> >> > GROUP BY day
> >> >
> >> > 还是flink会帮我自动改写SQL,我不用关心?
> >> >
> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> >> > <
> >>
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png>
> ;
> >>
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Re: flink sql count distonct 优化
Posted by guomuhua <66...@qq.com>.
Jark wrote
> 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> agg支持这个参数了。可以期待下。
>
> Best,
> Jark
>
> On Wed, 24 Mar 2021 at 19:29, Robin Zhang <
> vincent2015qdlg@
> >
> wrote:
>
>> Hi,guomuhua
>> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>>
>> Best,
>> Robin
>>
>>
>> guomuhua wrote
>> > 在SQL中,如果开启了 local-global 参数:set
>> > table.optimizer.agg-phase-strategy=TWO_PHASE;
>> > 或者开启了Partial-Final 参数:set
>> table.optimizer.distinct-agg.split.enabled=true;
>> > set
>> > table.optimizer.distinct-agg.split.bucket-num=1024;
>> > 还需要对应的将SQL改写为两段式吗?
>> > 例如:
>> > 原SQL:
>> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>> >
>> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>> > SELECT day, SUM(cnt) total
>> > FROM (
>> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>> > FROM T GROUP BY day, MOD(buy_id, 1024))
>> > GROUP BY day
>> >
>> > 还是flink会帮我自动改写SQL,我不用关心?
>> >
>> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
>> > <
>> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png>
>>
>> >
>> >
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql count distonct 优化
Posted by Jark Wu <im...@gmail.com>.
我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
agg支持这个参数了。可以期待下。
Best,
Jark
On Wed, 24 Mar 2021 at 19:29, Robin Zhang <vi...@outlook.com>
wrote:
> Hi,guomuhua
> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>
> Best,
> Robin
>
>
> guomuhua wrote
> > 在SQL中,如果开启了 local-global 参数:set
> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> > 或者开启了Partial-Final 参数:set
> table.optimizer.distinct-agg.split.enabled=true;
> > set
> > table.optimizer.distinct-agg.split.bucket-num=1024;
> > 还需要对应的将SQL改写为两段式吗?
> > 例如:
> > 原SQL:
> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >
> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> > SELECT day, SUM(cnt) total
> > FROM (
> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> > FROM T GROUP BY day, MOD(buy_id, 1024))
> > GROUP BY day
> >
> > 还是flink会帮我自动改写SQL,我不用关心?
> >
> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> > <
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png>
>
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Re: flink sql count distonct 优化
Posted by Robin Zhang <vi...@outlook.com>.
Hi,guomuhua
开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
Best,
Robin
guomuhua wrote
> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
> set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
> 原SQL:
> SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>
> 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> SELECT day, SUM(cnt) total
> FROM (
> SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> FROM T GROUP BY day, MOD(buy_id, 1024))
> GROUP BY day
>
> 还是flink会帮我自动改写SQL,我不用关心?
>
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> <http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql count distonct 优化
Posted by Robin Zhang <vi...@outlook.com>.
Hi,guomuhua
`The number of inputs accumulated by local aggregation every time is
based on mini-batch interval. It means local-global aggregation depends on
mini-batch optimization is enabled `
,关于本地聚合,官网有这么一段话,也就是说,需要先开启批次聚合,然后才能使用本地聚合,加起来有三个参数.
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
看你提问时只是开启了本地聚合一个参数,不知道是不是没写全。
Best,
Robin
guomuhua wrote
> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
> set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
> 原SQL:
> SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>
> 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> SELECT day, SUM(cnt) total
> FROM (
> SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> FROM T GROUP BY day, MOD(buy_id, 1024))
> GROUP BY day
>
> 还是flink会帮我自动改写SQL,我不用关心?
>
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> <http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Sent from: http://apache-flink.147419.n8.nabble.com/