You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "hdxg1101300123@163.com" <hd...@163.com> on 2022/06/01 07:41:05 UTC
关于flinksql聚合函数实现的学习疑问
您好:
最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
比如这样一条sql语句:
select
dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
tumble(row_time, interval '1' minute);
在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
谢谢!
hdxg1101300123@163.com
Re: 关于flinksql聚合函数实现的学习疑问
Posted by 陈铜玖 <ch...@hst.com>.
Dear:
您可以首先建一个这样的对象
class Acc{
long sum;
long max;
long min;
...
}
在 AggregateFunction 里面维护这样的 ACC ,
就可以在 add 方法里面对维护的 acc 和新传入的值之间实现多需求下的结果更新。
不知道你想了解的是不是这个意思
------------------ Original ------------------
From: "Lincoln Lee"<lincoln.86xy@gmail.com>;
Date: Wed, Jun 1, 2022 04:53 PM
To: "user-zh"<user-zh@flink.apache.org>;
Subject: Re: 关于flinksql聚合函数实现的学习疑问
flink sql 的实现可以参考下 flink-table planner&runtime 部分的代码
从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如
datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑
Best,
Lincoln Lee
hdxg1101300123@163.com <hdxg1101300123@163.com> 于2022年6月1日周三 15:49写道:
> 您好:
> 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
> 比如这样一条sql语句:
> select
> dim,
> count(*) as pv,
> sum(price) as sum_price,
> max(price) as max_price,
> min(price) as min_price,
> -- 计算 uv 数
> count(distinct user_id) as uv,
> UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS
> STRING)) * 1000 as window_start
> from source_table
> group by
> dim,
> tumble(row_time, interval '1' minute);
> 在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
> 如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数
> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K,
> W> windowFunction)
>
> 是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
> 谢谢!
>
>
> hdxg1101300123@163.com
>
Re: 关于flinksql聚合函数实现的学习疑问
Posted by Lincoln Lee <li...@gmail.com>.
flink sql 的实现可以参考下 flink-table planner&runtime 部分的代码
从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如
datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑
Best,
Lincoln Lee
hdxg1101300123@163.com <hd...@163.com> 于2022年6月1日周三 15:49写道:
> 您好:
> 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
> 比如这样一条sql语句:
> select
> dim,
> count(*) as pv,
> sum(price) as sum_price,
> max(price) as max_price,
> min(price) as min_price,
> -- 计算 uv 数
> count(distinct user_id) as uv,
> UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS
> STRING)) * 1000 as window_start
> from source_table
> group by
> dim,
> tumble(row_time, interval '1' minute);
> 在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
> 如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数
> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K,
> W> windowFunction)
>
> 是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
> 谢谢!
>
>
> hdxg1101300123@163.com
>
Re:关于flinksql聚合函数实现的学习疑问
Posted by Xuyang <xy...@163.com>.
Hi, 代码的话可以参考[1],由于agg的相关代码走的是codegen,推荐通过debug相关的测试类到附近。然后观察生成的代码。<br/><br/>[1] https://github.com/apache/flink/blob/95e378e6565eea9b6b83702645e99733c33a957a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregateBase.scala#L171
在 2022-06-01 15:41:05,"hdxg1101300123@163.com" <hd...@163.com> 写道:
>您好:
> 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
> 比如这样一条sql语句:
> select
> dim,
> count(*) as pv,
> sum(price) as sum_price,
> max(price) as max_price,
> min(price) as min_price,
> -- 计算 uv 数
> count(distinct user_id) as uv,
> UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
>from source_table
>group by
> dim,
> tumble(row_time, interval '1' minute);
>在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
>如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
>是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
>谢谢!
>
>
>hdxg1101300123@163.com