You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Benchao Li <li...@gmail.com> on 2020/04/21 09:45:54 UTC

Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这是个很好的问题。

> 这个方法的调用时机是什么呢,会被调用几次呢?
这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
当然这里说的是regular groupby;
如果是window group by的话,就是每个window都会做上面的这个事情。

> 一个accumulator的生命周期是怎么样的?
如果是window group by的话,那它的生命周期就是跟window是一样的。
如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
数据是0条的时候,也会销毁。

> 一个accumulator会被反复的序列化反序列化吗?
这个问题非常好。它是否序列化跟你用的state backend有关系。
如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
checkpoint的时候序列化。
当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。

刘首维 <li...@autohome.com.cn> 于2020年4月21日周二 下午5:37写道:

> Hi all,
>
>
>
>    最近有几个疑问没能很好地理解清楚:
>
>
>
>    我们都知道,UDAF中的有createAccumulator这个方法,那么:
>
> 这个方法的调用时机是什么呢,会被调用几次呢?
>
> 一个accumulator的生命周期是怎么样的?
>
> 一个accumulator会被反复的序列化反序列化吗?
>
>
>  麻烦了解相关细节的社区的同学们帮忙解答一下~
>
> 先谢谢啦
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Posted by Benchao Li <li...@gmail.com>.
Hi 首维,

非常开心我的回答能对你有所帮助。

1, 我感觉你这个想法其实就是把state backend 设置为filesystem的表现。因为在state
backend是filesystem的时候,state是放到内存的,也就是更新state是不需要序列化和反序列化的,性能相对要高一些。如果使用rocksdb的话,state就是放到rocksdb里了。我理解这里之所以是这样设计,是因为某些state本身会比较大,内存不一定放得下,此时如果用rocksdb作为state
backend,就不会占用heap的内存。
2, 其实这里的销毁的意思就是clear acc对应的state,这个应该跟第一个问题是有关系的,销毁不需要的状态是为了不让state无限增长。


刘首维 <li...@autohome.com.cn> 于2020年4月22日周三 下午1:56写道:

> Hi benchao,
>
>   感谢你的回复,完美解答了我的疑惑。然后我在看代码做实验的时候又派生了两个想法/问题:
>
>     1. 将accu作为一个单独的field在GroupAggFunction中,然后在snapshotState的时候才去向state更新
>     2. 为accu设置一个0/初始/空/幺元或者类似概念的状态,这样我们就不必去销毁acc而是去只是将其置回
>
>
>   或者说时区在设计这个部分的时候,有什么其他的考量吗
> ________________________________
> 发件人: Benchao Li <li...@gmail.com>
> 发送时间: 2020年4月21日 18:28:09
> 收件人: 刘首维
> 抄送: user-zh
> 主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题
>
> Hi 首维,
>
>
> 这里说的是同一个key会创建一次,所以你看到一个AggFunction会创建多次更这个并不冲突,因为一个AggFunction属于一个subtask维度的,一个subtask处理不止一个key的。
>
> 你的第二个问题:
> > 我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group
> by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来
> 这个的确是存在这个问题。比如两个group by嵌套使用,第一个group by由于会更新结果,所以会retract之前的结果。第二个group
> by收到retract就会先撤销当前的结果,然后再发送撤销之后的结果;然后后面又会来一条正常的append数据,这条数据导致再次retract一次,和append一次。
>
> > 还有在agg over group by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。
> 这个没有本质区别,正常情况下一个key下的agg是会一直存在的。除非是你配置了state
> retention时间,那么对应的key的state如果过了retention时间没有访问就会被清理。
>
> 刘首维 <li...@autohome.com.cn>>
> 于2020年4月21日周二 下午5:59写道:
>
> Hi benchao,
>
>
> 非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group
> by的一个key应该被创建一次,可是我做实验的时候(在create
> acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。
>
>
>   为了方便你帮我分析,我来补充一下环境和场景:
>
>
>    版本: 1.7.2/1.9.1
>
>   场景 : group by 嵌套, 常规聚合
>
>
>
>   我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group
> by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来。还有在agg over group
> by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。
>
>
> 再次感谢你的回复
>
> best regards
>
> ________________________________
> 发件人: Benchao Li <li...@gmail.com>>
> 发送时间: 2020年4月21日 17:45:54
> 收件人: user-zh
> 主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题
>
> Hi 首维,
>
> 这是个很好的问题。
>
> > 这个方法的调用时机是什么呢,会被调用几次呢?
> 这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
> 当然这里说的是regular groupby;
> 如果是window group by的话,就是每个window都会做上面的这个事情。
>
> > 一个accumulator的生命周期是怎么样的?
> 如果是window group by的话,那它的生命周期就是跟window是一样的。
> 如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
> 数据是0条的时候,也会销毁。
>
> > 一个accumulator会被反复的序列化反序列化吗?
> 这个问题非常好。它是否序列化跟你用的state backend有关系。
> 如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
> checkpoint的时候序列化。
> 当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。
>
> 刘首维 <li...@autohome.com.cn>>
> 于2020年4月21日周二 下午5:37写道:
>
> > Hi all,
> >
> >
> >
> >    最近有几个疑问没能很好地理解清楚:
> >
> >
> >
> >    我们都知道,UDAF中的有createAccumulator这个方法,那么:
> >
> > 这个方法的调用时机是什么呢,会被调用几次呢?
> >
> > 一个accumulator的生命周期是怎么样的?
> >
> > 一个accumulator会被反复的序列化反序列化吗?
> >
> >
> >  麻烦了解相关细节的社区的同学们帮忙解答一下~
> >
> > 先谢谢啦
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com<ma...@gmail.com>;
> libenchao@pku.edu.cn<ma...@pku.edu.cn>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com<ma...@gmail.com>;
> libenchao@pku.edu.cn<ma...@pku.edu.cn>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

答复: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Posted by 刘首维 <li...@autohome.com.cn>.
Hi benchao,

  感谢你的回复,完美解答了我的疑惑。然后我在看代码做实验的时候又派生了两个想法/问题:

    1. 将accu作为一个单独的field在GroupAggFunction中,然后在snapshotState的时候才去向state更新
    2. 为accu设置一个0/初始/空/幺元或者类似概念的状态,这样我们就不必去销毁acc而是去只是将其置回


  或者说时区在设计这个部分的时候,有什么其他的考量吗
________________________________
发件人: Benchao Li <li...@gmail.com>
发送时间: 2020年4月21日 18:28:09
收件人: 刘首维
抄送: user-zh
主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这里说的是同一个key会创建一次,所以你看到一个AggFunction会创建多次更这个并不冲突,因为一个AggFunction属于一个subtask维度的,一个subtask处理不止一个key的。

你的第二个问题:
> 我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来
这个的确是存在这个问题。比如两个group by嵌套使用,第一个group by由于会更新结果,所以会retract之前的结果。第二个group by收到retract就会先撤销当前的结果,然后再发送撤销之后的结果;然后后面又会来一条正常的append数据,这条数据导致再次retract一次,和append一次。

> 还有在agg over group by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。
这个没有本质区别,正常情况下一个key下的agg是会一直存在的。除非是你配置了state retention时间,那么对应的key的state如果过了retention时间没有访问就会被清理。

刘首维 <li...@autohome.com.cn>> 于2020年4月21日周二 下午5:59写道:

Hi benchao,


非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group by的一个key应该被创建一次,可是我做实验的时候(在create acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。


  为了方便你帮我分析,我来补充一下环境和场景:


   版本: 1.7.2/1.9.1

  场景 : group by 嵌套, 常规聚合



  我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来。还有在agg over group by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。


再次感谢你的回复

best regards

________________________________
发件人: Benchao Li <li...@gmail.com>>
发送时间: 2020年4月21日 17:45:54
收件人: user-zh
主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这是个很好的问题。

> 这个方法的调用时机是什么呢,会被调用几次呢?
这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
当然这里说的是regular groupby;
如果是window group by的话,就是每个window都会做上面的这个事情。

> 一个accumulator的生命周期是怎么样的?
如果是window group by的话,那它的生命周期就是跟window是一样的。
如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
数据是0条的时候,也会销毁。

> 一个accumulator会被反复的序列化反序列化吗?
这个问题非常好。它是否序列化跟你用的state backend有关系。
如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
checkpoint的时候序列化。
当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。

刘首维 <li...@autohome.com.cn>> 于2020年4月21日周二 下午5:37写道:

> Hi all,
>
>
>
>    最近有几个疑问没能很好地理解清楚:
>
>
>
>    我们都知道,UDAF中的有createAccumulator这个方法,那么:
>
> 这个方法的调用时机是什么呢,会被调用几次呢?
>
> 一个accumulator的生命周期是怎么样的?
>
> 一个accumulator会被反复的序列化反序列化吗?
>
>
>  麻烦了解相关细节的社区的同学们帮忙解答一下~
>
> 先谢谢啦
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com<ma...@gmail.com>; libenchao@pku.edu.cn<ma...@pku.edu.cn>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com<ma...@gmail.com>; libenchao@pku.edu.cn<ma...@pku.edu.cn>

Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Posted by Benchao Li <li...@gmail.com>.
Hi 首维,

这里说的是同一个key会创建一次,所以你看到一个AggFunction会创建多次更这个并不冲突,因为一个AggFunction属于一个subtask维度的,一个subtask处理不止一个key的。

你的第二个问题:
> 我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group
by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来
这个的确是存在这个问题。比如两个group by嵌套使用,第一个group by由于会更新结果,所以会retract之前的结果。第二个group
by收到retract就会先撤销当前的结果,然后再发送撤销之后的结果;然后后面又会来一条正常的append数据,这条数据导致再次retract一次,和append一次。

> 还有在agg over group by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。
这个没有本质区别,正常情况下一个key下的agg是会一直存在的。除非是你配置了state
retention时间,那么对应的key的state如果过了retention时间没有访问就会被清理。

刘首维 <li...@autohome.com.cn> 于2020年4月21日周二 下午5:59写道:

> Hi benchao,
>
>
> 非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group
> by的一个key应该被创建一次,可是我做实验的时候(在create
> acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。
>
>
>   为了方便你帮我分析,我来补充一下环境和场景:
>
>
>    版本: 1.7.2/1.9.1
>
>   场景 : group by 嵌套, 常规聚合
>
>
>
>   我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group
> by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来。还有在agg over group
> by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。
>
>
> 再次感谢你的回复
>
> best regards
> ------------------------------
> *发件人:* Benchao Li <li...@gmail.com>
> *发送时间:* 2020年4月21日 17:45:54
> *收件人:* user-zh
> *主题:* Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题
>
> Hi 首维,
>
> 这是个很好的问题。
>
> > 这个方法的调用时机是什么呢,会被调用几次呢?
> 这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
> 当然这里说的是regular groupby;
> 如果是window group by的话,就是每个window都会做上面的这个事情。
>
> > 一个accumulator的生命周期是怎么样的?
> 如果是window group by的话,那它的生命周期就是跟window是一样的。
> 如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
> 数据是0条的时候,也会销毁。
>
> > 一个accumulator会被反复的序列化反序列化吗?
> 这个问题非常好。它是否序列化跟你用的state backend有关系。
> 如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
> checkpoint的时候序列化。
> 当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。
>
> 刘首维 <li...@autohome.com.cn> 于2020年4月21日周二 下午5:37写道:
>
> > Hi all,
> >
> >
> >
> >    最近有几个疑问没能很好地理解清楚:
> >
> >
> >
> >    我们都知道,UDAF中的有createAccumulator这个方法,那么:
> >
> > 这个方法的调用时机是什么呢,会被调用几次呢?
> >
> > 一个accumulator的生命周期是怎么样的?
> >
> > 一个accumulator会被反复的序列化反序列化吗?
> >
> >
> >  麻烦了解相关细节的社区的同学们帮忙解答一下~
> >
> > 先谢谢啦
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

答复: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Posted by 刘首维 <li...@autohome.com.cn>.
Hi benchao,


非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group by的一个key应该被创建一次,可是我做实验的时候(在create acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。


  为了方便你帮我分析,我来补充一下环境和场景:


   版本: 1.7.2/1.9.1

  场景 : group by 嵌套, 常规聚合



  我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来。还有在agg over group by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。


再次感谢你的回复

best regards

________________________________
发件人: Benchao Li <li...@gmail.com>
发送时间: 2020年4月21日 17:45:54
收件人: user-zh
主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这是个很好的问题。

> 这个方法的调用时机是什么呢,会被调用几次呢?
这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
当然这里说的是regular groupby;
如果是window group by的话,就是每个window都会做上面的这个事情。

> 一个accumulator的生命周期是怎么样的?
如果是window group by的话,那它的生命周期就是跟window是一样的。
如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
数据是0条的时候,也会销毁。

> 一个accumulator会被反复的序列化反序列化吗?
这个问题非常好。它是否序列化跟你用的state backend有关系。
如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
checkpoint的时候序列化。
当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。

刘首维 <li...@autohome.com.cn> 于2020年4月21日周二 下午5:37写道:

> Hi all,
>
>
>
>    最近有几个疑问没能很好地理解清楚:
>
>
>
>    我们都知道,UDAF中的有createAccumulator这个方法,那么:
>
> 这个方法的调用时机是什么呢,会被调用几次呢?
>
> 一个accumulator的生命周期是怎么样的?
>
> 一个accumulator会被反复的序列化反序列化吗?
>
>
>  麻烦了解相关细节的社区的同学们帮忙解答一下~
>
> 先谢谢啦
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn