You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by chen310 <22...@163.com> on 2020/09/13 02:37:34 UTC
UDAF函数在over窗口使用问题
flink版本 1.11.1
实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下:
public class AggDistinctDetail extends AggregateFunction<String,
AggDistinctDetail.Details> {
private static final Logger logger =
LoggerFactory.getLogger(AggDistinctDetail.class);
public static class Details {
public Set<String> set;
}
@Override
public Details createAccumulator() {
return new Details();
}
@Override
public String getValue(Details acc) {
return JSON.toJSONString(acc.set);
}
public void accumulate(Details acc, String val) {
if (acc.set == null) {
acc.set = new HashSet<>();
}
acc.set.add(val);
}
public void retract(Details acc, String val) {
//now, agg detail don't need support retraction
}
public void merge(Details acc, Iterable<Details> it) {
Iterator<Details> iter = it.iterator();
if (acc.set == null) {
acc.set = new HashSet<>();
}
while (iter.hasNext()) {
Details a = iter.next();
acc.set.addAll(a.set);
}
}
public void resetAccumulator(Details acc) {
acc.set = null;
}
}
将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime)
requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。
drop function if exists UDF_InfoDistinctMerge;
create function UDF_InfoDistinctMerge AS
'com.binance.risk.flink.udf.AggDistinctDetail';
select
realIp ,
UDF_InfoDistinctMerge(userId) over w1 as userSet
from source_table
window w1 as (partition by realIp order by requestDateTime asc RANGE BETWEEN
INTERVAL '24' hour preceding AND CURRENT ROW) ;
实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。
问题:
是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: UDAF函数在over窗口使用问题
Posted by chen310 <22...@163.com>.
谢谢,的确是retract方法没有实现导致了。已解决
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: UDAF函数在over窗口使用问题
Posted by Tianwang Li <li...@gmail.com>.
有没有文章,介绍过期时间清理的?需不需要用户自己设置TTL。
例如:我有一个TOPN计算,怎么做过期数据清理?(还是会自动处理)
SELECT cnt, word, time_hour
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY time_hour ORDER BY cnt desc) AS rownum
FROM test_word_count)
WHERE rownum <= 100;
Benchao Li <li...@apache.org> 于2020年9月14日周一 下午1:03写道:
> Hi,
>
> 看起来你并没有实现`retract` 方法,正常来讲,over window在处理过期数据的时候,会将过期的数据进行一次retract计算。
> 所以你需要正确的实现一下retract方法。
>
> chen310 <22...@163.com> 于2020年9月14日周一 上午10:01写道:
>
> > flink版本 1.11.1
> >
> > 实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下:
> >
> > public class AggDistinctDetail extends AggregateFunction<String,
> > AggDistinctDetail.Details> {
> > private static final Logger logger =
> > LoggerFactory.getLogger(AggDistinctDetail.class);
> >
> > public static class Details {
> > public Set<String> set;
> > }
> >
> > @Override
> > public Details createAccumulator() {
> > return new Details();
> > }
> >
> > @Override
> > public String getValue(Details acc) {
> > return JSON.toJSONString(acc.set);
> > }
> >
> > public void accumulate(Details acc, String val) {
> > if (acc.set == null) {
> > acc.set = new HashSet<>();
> > }
> > acc.set.add(val);
> > }
> >
> > public void retract(Details acc, String val) {
> > //now, agg detail don't need support retraction
> > }
> >
> > public void merge(Details acc, Iterable<Details> it) {
> > Iterator<Details> iter = it.iterator();
> > if (acc.set == null) {
> > acc.set = new HashSet<>();
> > }
> > while (iter.hasNext()) {
> > Details a = iter.next();
> > acc.set.addAll(a.set);
> > }
> > }
> >
> > public void resetAccumulator(Details acc) {
> > acc.set = null;
> > }
> > }
> >
> > 将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime)
> > requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。
> >
> > drop function if exists UDF_InfoDistinctMerge;
> > create function UDF_InfoDistinctMerge AS
> > 'com.binance.risk.flink.udf.AggDistinctDetail';
> >
> > select
> > realIp ,
> > UDF_InfoDistinctMerge(userId) over w1 as userSet
> > from source_table
> > window w1 as (partition by realIp order by requestDateTime asc RANGE
> > BETWEEN
> > INTERVAL '24' hour preceding AND CURRENT ROW) ;
> >
> > 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。
> >
> > 问题:
> > 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>
>
> --
>
> Best,
> Benchao Li
>
--
**************************************
tivanli
**************************************
Re: UDAF函数在over窗口使用问题
Posted by Benchao Li <li...@apache.org>.
Hi,
看起来你并没有实现`retract` 方法,正常来讲,over window在处理过期数据的时候,会将过期的数据进行一次retract计算。
所以你需要正确的实现一下retract方法。
chen310 <22...@163.com> 于2020年9月14日周一 上午10:01写道:
> flink版本 1.11.1
>
> 实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下:
>
> public class AggDistinctDetail extends AggregateFunction<String,
> AggDistinctDetail.Details> {
> private static final Logger logger =
> LoggerFactory.getLogger(AggDistinctDetail.class);
>
> public static class Details {
> public Set<String> set;
> }
>
> @Override
> public Details createAccumulator() {
> return new Details();
> }
>
> @Override
> public String getValue(Details acc) {
> return JSON.toJSONString(acc.set);
> }
>
> public void accumulate(Details acc, String val) {
> if (acc.set == null) {
> acc.set = new HashSet<>();
> }
> acc.set.add(val);
> }
>
> public void retract(Details acc, String val) {
> //now, agg detail don't need support retraction
> }
>
> public void merge(Details acc, Iterable<Details> it) {
> Iterator<Details> iter = it.iterator();
> if (acc.set == null) {
> acc.set = new HashSet<>();
> }
> while (iter.hasNext()) {
> Details a = iter.next();
> acc.set.addAll(a.set);
> }
> }
>
> public void resetAccumulator(Details acc) {
> acc.set = null;
> }
> }
>
> 将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime)
> requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。
>
> drop function if exists UDF_InfoDistinctMerge;
> create function UDF_InfoDistinctMerge AS
> 'com.binance.risk.flink.udf.AggDistinctDetail';
>
> select
> realIp ,
> UDF_InfoDistinctMerge(userId) over w1 as userSet
> from source_table
> window w1 as (partition by realIp order by requestDateTime asc RANGE
> BETWEEN
> INTERVAL '24' hour preceding AND CURRENT ROW) ;
>
> 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。
>
> 问题:
> 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li