You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by forideal <fs...@163.com> on 2020/04/28 02:45:41 UTC
Flink Buildin UDF 性能较慢
大家好:
我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG 等等。
我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)
SQL:
select
query_nor,
sum(cast (1asbigint))as query_nor_counter
from ods_search_track
groupby
query_nor,
HOP(
event_time,interval'30'SECOND,interval'30'MINUTE)
sum:
public class Sum extends AggregateFunction<Long, AtomicLong> {
@Override
public boolean isDeterministic() {
return false;
}
@Override
public AtomicLong createAccumulator() {
return new AtomicLong();
}
@Override
public void open(FunctionContext context) throws Exception {
}
@Override
public Long getValue(AtomicLong acc) {
return acc.get();
}
@Override
public TypeInformation getResultType() {
return Types.LONG;
}
public void merge(AtomicLong acc, Iterable<AtomicLong> it) {
Iterator<AtomicLong> iter = it.iterator();
while (iter.hasNext()) {
AtomicLong a = iter.next();
acc.addAndGet(a.get());
}
}
public void accumulate(AtomicLong datas, Long data) {
datas.addAndGet(data);
}
}
使用 Flink buildin COUNT
select
query_nor,
count(1) as query_nor_counter
from ods_search_track
groupby
query_nor,
HOP(
event_time,interval'30'SECOND,interval'30'MINUTE)
Re:Re: Flink Buildin UDF 性能较慢
Posted by forideal <fs...@163.com>.
Hi Jark:
Thanks for your replay!
1. 是基于哪个版本,哪个 planner 进行的测试?
Flink 1.9.0 Blink Planner
2. 流计算模式还是批计算模式?
流计算模式
3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
注册的名字为 red_sum
Best forideal
在 2020-04-28 11:13:50,"Jark Wu" <im...@gmail.com> 写道:
>Hi,
>
>看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。
>我想先问几个问题:
>1. 是基于哪个版本,哪个 planner 进行的测试?
>2. 流计算模式还是批计算模式?
>3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
>
>Best,
>Jark
>
>On Tue, 28 Apr 2020 at 10:46, forideal <fs...@163.com> wrote:
>
>> 大家好:
>>
>>
>> 我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG
>> 等等。
>> 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)
>>
>>
>> SQL:
>>
>>
>> select
>> query_nor,
>> sum(cast (1asbigint))as query_nor_counter
>> from ods_search_track
>> groupby
>> query_nor,
>> HOP(
>> event_time,interval'30'SECOND,interval'30'MINUTE)
>> sum:
>> public class Sum extends AggregateFunction<Long, AtomicLong> {
>>
>> @Override
>> public boolean isDeterministic() {
>> return false;
>> }
>>
>> @Override
>> public AtomicLong createAccumulator() {
>> return new AtomicLong();
>> }
>>
>> @Override
>> public void open(FunctionContext context) throws Exception {
>>
>> }
>>
>> @Override
>> public Long getValue(AtomicLong acc) {
>> return acc.get();
>> }
>>
>> @Override
>> public TypeInformation getResultType() {
>> return Types.LONG;
>> }
>>
>> public void merge(AtomicLong acc, Iterable<AtomicLong> it) {
>> Iterator<AtomicLong> iter = it.iterator();
>> while (iter.hasNext()) {
>> AtomicLong a = iter.next();
>> acc.addAndGet(a.get());
>> }
>> }
>>
>> public void accumulate(AtomicLong datas, Long data) {
>> datas.addAndGet(data);
>> }
>> }
>>
>>
>> 使用 Flink buildin COUNT
>>
>>
>> select
>> query_nor,
>> count(1) as query_nor_counter
>> from ods_search_track
>> groupby
>> query_nor,
>> HOP(
>> event_time,interval'30'SECOND,interval'30'MINUTE)
Re: Flink Buildin UDF 性能较慢
Posted by Jark Wu <im...@gmail.com>.
Hi,
看了你的 UDAF 的实现,理论上是不可能比内置 count/sum 快的。可能是哪里有 bug 或是测试方式不对。
我想先问几个问题:
1. 是基于哪个版本,哪个 planner 进行的测试?
2. 流计算模式还是批计算模式?
3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
Best,
Jark
On Tue, 28 Apr 2020 at 10:46, forideal <fs...@163.com> wrote:
> 大家好:
>
>
> 我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG
> 等等。
> 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)
>
>
> SQL:
>
>
> select
> query_nor,
> sum(cast (1asbigint))as query_nor_counter
> from ods_search_track
> groupby
> query_nor,
> HOP(
> event_time,interval'30'SECOND,interval'30'MINUTE)
> sum:
> public class Sum extends AggregateFunction<Long, AtomicLong> {
>
> @Override
> public boolean isDeterministic() {
> return false;
> }
>
> @Override
> public AtomicLong createAccumulator() {
> return new AtomicLong();
> }
>
> @Override
> public void open(FunctionContext context) throws Exception {
>
> }
>
> @Override
> public Long getValue(AtomicLong acc) {
> return acc.get();
> }
>
> @Override
> public TypeInformation getResultType() {
> return Types.LONG;
> }
>
> public void merge(AtomicLong acc, Iterable<AtomicLong> it) {
> Iterator<AtomicLong> iter = it.iterator();
> while (iter.hasNext()) {
> AtomicLong a = iter.next();
> acc.addAndGet(a.get());
> }
> }
>
> public void accumulate(AtomicLong datas, Long data) {
> datas.addAndGet(data);
> }
> }
>
>
> 使用 Flink buildin COUNT
>
>
> select
> query_nor,
> count(1) as query_nor_counter
> from ods_search_track
> groupby
> query_nor,
> HOP(
> event_time,interval'30'SECOND,interval'30'MINUTE)