You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by carlc <ca...@126.com> on 2021/08/04 02:40:56 UTC
Flink sql 维表聚合问题请教
请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
-- 模拟需求(有点牵强...):
-- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
-- 1. 创建user_blacklist表
CREATE TABLE `user_blacklist` (
`user_id` bigint(20) NOT NULL,
`create_time` datetime NOT NULL,
PRIMARY KEY (`user_id`,`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO user_blacklist (`user_id`, `create_time`)
VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), (2,'2021-01-04 00:00:00');
-- 2. 模拟kafka数据:
-- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 00:00:00"}
-- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 00:00:00"}
-- 操作步骤:
当发送第1条kafka数据得到如下输出:
| OP| user_id| event_type | current_ts| bl_count |
| +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
| +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
当再次发送第1条kafka数据得到如下输出:
| +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
| +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
— SQL 如下:
create table kafka_user_event
(
`user_id` BIGINT,
`event_type` STRING,
`current_ts` timestamp(3),
`proc_time` AS PROCTIME()
) WITH (
'connector' = 'kafka',
...
);
create table mysql_user_blacklist
(
user_id BIGINT,
create_time timestamp(3),
primary key (user_id,create_time) not enforced
) WITH (
'connector' = 'jdbc',
…
);
create view v2_user_event as (
select t1.`user_id`
, t1.`event_type`
, t1.`current_ts`
, count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
from kafka_user_event t1
left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on t1.`user_id` = t2.`user_id`
where t1.`event_type` = 'LOGIN'
);
select * from v2_user_event;
Re: Flink sql 维表聚合问题请教
Posted by carlc <ca...@126.com>.
额...,说的太对了, batch任务没问题,但流任务就发生意想不到的问题.
该需求就是翻译原离线SQL(传统数仓), 现要改成实时分析. 结果发现有些需求好像实现不了
非常感谢!
> 在 2021年8月4日,16:50,黑色 <xi...@qq.com.INVALID> 写道:
>
> 你这是维表lookup,上流来数据来了,根据on后面的key,是当前去查快照返回结果,不可能是聚合之后的
> 当然你要是batch来了,没问题
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "user-zh" <tsreaper96@gmail.com>;
> 发送时间: 2021年8月4日(星期三) 下午4:44
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: Re: Flink sql 维表聚合问题请教
>
>
>
> Hi!
>
> 我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal
> join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join
>
> carlc <caoenergy@126.com> 于2021年8月4日周三 下午3:57写道:
>
> > 感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
> >
> > create view v_bl_user_count as (
> > select user_id, count(1)
> > from mysql_user_blacklist
> > group by user_id
> > );
> >
> > select t1.`user_id`
> > , t1.`event_type`
> > , t1.`current_ts`
> > from kafka_user_event t1
> > left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
> > t1.`user_id` = t2.`user_id`
> > where t1.`event_type` = ‘LOGIN’
> >
> > 异常信息:
> > org.apache.flink.table.api.TableException: Processing-time temporal join
> > is not supported yet.
> > at
> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
> > at
> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
> > at
> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
> > at
> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
> > at
> > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> >
> >
> >
> >
> > > 在 2021年8月4日,14:18,Caizhi Weng <tsreaper96@gmail.com> 写道:
> > >
> > > Hi!
> > >
> > > 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
> > >
> > > 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
> > > table join 了。
> > >
> > > carlc <caoenergy@126.com> 于2021年8月4日周三 上午10:41写道:
> > >
> > >> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
> > >>
> > >> -- 模拟需求(有点牵强...):
> > >> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表
> > mysql_user_blacklist
> > >> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
> > >>
> > >> -- 1. 创建user_blacklist表
> > >> CREATE TABLE `user_blacklist` (
> > >> `user_id` bigint(20) NOT NULL,
> > >> `create_time` datetime NOT NULL,
> > >> PRIMARY KEY (`user_id`,`create_time`)
> > >> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> > >> INSERT INTO user_blacklist (`user_id`, `create_time`)
> > >> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> > >> (2,'2021-01-04 00:00:00');
> > >>
> > >> -- 2. 模拟kafka数据:
> > >> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> > >> 00:00:00"}
> > >> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> > >> 00:00:00"}
> > >>
> > >> -- 操作步骤:
> > >> 当发送第1条kafka数据得到如下输出:
> > >> | OP| user_id| event_type | current_ts| bl_count |
> > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> > >> 当再次发送第1条kafka数据得到如下输出:
> > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
> > >>
> > >> — SQL 如下:
> > >>
> > >> create table kafka_user_event
> > >> (
> > >> `user_id` BIGINT,
> > >> `event_type` STRING,
> > >> `current_ts` timestamp(3),
> > >> `proc_time` AS PROCTIME()
> > >> ) WITH (
> > >> 'connector' = 'kafka',
> > >> ...
> > >> );
> > >>
> > >> create table mysql_user_blacklist
> > >> (
> > >> user_id BIGINT,
> > >> create_time timestamp(3),
> > >> primary key (user_id,create_time) not enforced
> > >> ) WITH (
> > >> 'connector' = 'jdbc',
> > >> …
> > >> );
> > >>
> > >> create view v2_user_event as (
> > >> select t1.`user_id`
> > >> , t1.`event_type`
> > >> , t1.`current_ts`
> > >> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
> > >> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> > >> from kafka_user_event t1
> > >> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS
> > t2
> > >> on t1.`user_id` = t2.`user_id`
> > >> where t1.`event_type` = 'LOGIN'
> > >> );
> > >>
> > >> select * from v2_user_event;
> > >>
> > >>
> >
> >
回复: Flink sql 维表聚合问题请教
Posted by 黑色 <xi...@qq.com.INVALID>.
你这是维表lookup,上流来数据来了,根据on后面的key,是当前去查快照返回结果,不可能是聚合之后的
当然你要是batch来了,没问题
------------------ 原始邮件 ------------------
发件人: "user-zh" <tsreaper96@gmail.com>;
发送时间: 2021年8月4日(星期三) 下午4:44
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: Flink sql 维表聚合问题请教
Hi!
我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal
join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join
carlc <caoenergy@126.com> 于2021年8月4日周三 下午3:57写道:
> 感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
>
> create view v_bl_user_count as (
> select user_id, count(1)
> from mysql_user_blacklist
> group by user_id
> );
>
> select t1.`user_id`
> , t1.`event_type`
> , t1.`current_ts`
> from kafka_user_event t1
> left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
> t1.`user_id` = t2.`user_id`
> where t1.`event_type` = ‘LOGIN’
>
> 异常信息:
> org.apache.flink.table.api.TableException: Processing-time temporal join
> is not supported yet.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>
>
>
>
> > 在 2021年8月4日,14:18,Caizhi Weng <tsreaper96@gmail.com> 写道:
> >
> > Hi!
> >
> > 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
> >
> > 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
> > table join 了。
> >
> > carlc <caoenergy@126.com> 于2021年8月4日周三 上午10:41写道:
> >
> >> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
> >>
> >> -- 模拟需求(有点牵强...):
> >> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表
> mysql_user_blacklist
> >> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
> >>
> >> -- 1. 创建user_blacklist表
> >> CREATE TABLE `user_blacklist` (
> >> `user_id` bigint(20) NOT NULL,
> >> `create_time` datetime NOT NULL,
> >> PRIMARY KEY (`user_id`,`create_time`)
> >> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> >> INSERT INTO user_blacklist (`user_id`, `create_time`)
> >> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> >> (2,'2021-01-04 00:00:00');
> >>
> >> -- 2. 模拟kafka数据:
> >> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> >> 00:00:00"}
> >> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> >> 00:00:00"}
> >>
> >> -- 操作步骤:
> >> 当发送第1条kafka数据得到如下输出:
> >> | OP| user_id| event_type | current_ts| bl_count |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> >> 当再次发送第1条kafka数据得到如下输出:
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
> >>
> >> — SQL 如下:
> >>
> >> create table kafka_user_event
> >> (
> >> `user_id` BIGINT,
> >> `event_type` STRING,
> >> `current_ts` timestamp(3),
> >> `proc_time` AS PROCTIME()
> >> ) WITH (
> >> 'connector' = 'kafka',
> >> ...
> >> );
> >>
> >> create table mysql_user_blacklist
> >> (
> >> user_id BIGINT,
> >> create_time timestamp(3),
> >> primary key (user_id,create_time) not enforced
> >> ) WITH (
> >> 'connector' = 'jdbc',
> >> …
> >> );
> >>
> >> create view v2_user_event as (
> >> select t1.`user_id`
> >> , t1.`event_type`
> >> , t1.`current_ts`
> >> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
> >> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> >> from kafka_user_event t1
> >> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS
> t2
> >> on t1.`user_id` = t2.`user_id`
> >> where t1.`event_type` = 'LOGIN'
> >> );
> >>
> >> select * from v2_user_event;
> >>
> >>
>
>
Re: Flink sql 维表聚合问题请教
Posted by Caizhi Weng <ts...@gmail.com>.
Hi!
我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal
join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join
carlc <ca...@126.com> 于2021年8月4日周三 下午3:57写道:
> 感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
>
> create view v_bl_user_count as (
> select user_id, count(1)
> from mysql_user_blacklist
> group by user_id
> );
>
> select t1.`user_id`
> , t1.`event_type`
> , t1.`current_ts`
> from kafka_user_event t1
> left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
> t1.`user_id` = t2.`user_id`
> where t1.`event_type` = ‘LOGIN’
>
> 异常信息:
> org.apache.flink.table.api.TableException: Processing-time temporal join
> is not supported yet.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>
>
>
>
> > 在 2021年8月4日,14:18,Caizhi Weng <ts...@gmail.com> 写道:
> >
> > Hi!
> >
> > 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
> >
> > 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
> > table join 了。
> >
> > carlc <ca...@126.com> 于2021年8月4日周三 上午10:41写道:
> >
> >> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
> >>
> >> -- 模拟需求(有点牵强...):
> >> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表
> mysql_user_blacklist
> >> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
> >>
> >> -- 1. 创建user_blacklist表
> >> CREATE TABLE `user_blacklist` (
> >> `user_id` bigint(20) NOT NULL,
> >> `create_time` datetime NOT NULL,
> >> PRIMARY KEY (`user_id`,`create_time`)
> >> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> >> INSERT INTO user_blacklist (`user_id`, `create_time`)
> >> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> >> (2,'2021-01-04 00:00:00');
> >>
> >> -- 2. 模拟kafka数据:
> >> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> >> 00:00:00"}
> >> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> >> 00:00:00"}
> >>
> >> -- 操作步骤:
> >> 当发送第1条kafka数据得到如下输出:
> >> | OP| user_id| event_type | current_ts| bl_count |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> >> 当再次发送第1条kafka数据得到如下输出:
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
> >>
> >> — SQL 如下:
> >>
> >> create table kafka_user_event
> >> (
> >> `user_id` BIGINT,
> >> `event_type` STRING,
> >> `current_ts` timestamp(3),
> >> `proc_time` AS PROCTIME()
> >> ) WITH (
> >> 'connector' = 'kafka',
> >> ...
> >> );
> >>
> >> create table mysql_user_blacklist
> >> (
> >> user_id BIGINT,
> >> create_time timestamp(3),
> >> primary key (user_id,create_time) not enforced
> >> ) WITH (
> >> 'connector' = 'jdbc',
> >> …
> >> );
> >>
> >> create view v2_user_event as (
> >> select t1.`user_id`
> >> , t1.`event_type`
> >> , t1.`current_ts`
> >> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
> >> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> >> from kafka_user_event t1
> >> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS
> t2
> >> on t1.`user_id` = t2.`user_id`
> >> where t1.`event_type` = 'LOGIN'
> >> );
> >>
> >> select * from v2_user_event;
> >>
> >>
>
>
Re: Flink sql 维表聚合问题请教
Posted by carlc <ca...@126.com>.
感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
create view v_bl_user_count as (
select user_id, count(1)
from mysql_user_blacklist
group by user_id
);
select t1.`user_id`
, t1.`event_type`
, t1.`current_ts`
from kafka_user_event t1
left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on t1.`user_id` = t2.`user_id`
where t1.`event_type` = ‘LOGIN’
异常信息:
org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> 在 2021年8月4日,14:18,Caizhi Weng <ts...@gmail.com> 写道:
>
> Hi!
>
> 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
>
> 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
> table join 了。
>
> carlc <ca...@126.com> 于2021年8月4日周三 上午10:41写道:
>
>> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
>>
>> -- 模拟需求(有点牵强...):
>> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist
>> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
>>
>> -- 1. 创建user_blacklist表
>> CREATE TABLE `user_blacklist` (
>> `user_id` bigint(20) NOT NULL,
>> `create_time` datetime NOT NULL,
>> PRIMARY KEY (`user_id`,`create_time`)
>> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
>> INSERT INTO user_blacklist (`user_id`, `create_time`)
>> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
>> (2,'2021-01-04 00:00:00');
>>
>> -- 2. 模拟kafka数据:
>> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
>> 00:00:00"}
>> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
>> 00:00:00"}
>>
>> -- 操作步骤:
>> 当发送第1条kafka数据得到如下输出:
>> | OP| user_id| event_type | current_ts| bl_count |
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
>> 当再次发送第1条kafka数据得到如下输出:
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
>>
>> — SQL 如下:
>>
>> create table kafka_user_event
>> (
>> `user_id` BIGINT,
>> `event_type` STRING,
>> `current_ts` timestamp(3),
>> `proc_time` AS PROCTIME()
>> ) WITH (
>> 'connector' = 'kafka',
>> ...
>> );
>>
>> create table mysql_user_blacklist
>> (
>> user_id BIGINT,
>> create_time timestamp(3),
>> primary key (user_id,create_time) not enforced
>> ) WITH (
>> 'connector' = 'jdbc',
>> …
>> );
>>
>> create view v2_user_event as (
>> select t1.`user_id`
>> , t1.`event_type`
>> , t1.`current_ts`
>> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
>> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
>> from kafka_user_event t1
>> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2
>> on t1.`user_id` = t2.`user_id`
>> where t1.`event_type` = 'LOGIN'
>> );
>>
>> select * from v2_user_event;
>>
>>
Re: Flink sql 维表聚合问题请教
Posted by Caizhi Weng <ts...@gmail.com>.
Hi!
这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
table join 了。
carlc <ca...@126.com> 于2021年8月4日周三 上午10:41写道:
> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
>
> -- 模拟需求(有点牵强...):
> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist
> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
>
> -- 1. 创建user_blacklist表
> CREATE TABLE `user_blacklist` (
> `user_id` bigint(20) NOT NULL,
> `create_time` datetime NOT NULL,
> PRIMARY KEY (`user_id`,`create_time`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> INSERT INTO user_blacklist (`user_id`, `create_time`)
> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> (2,'2021-01-04 00:00:00');
>
> -- 2. 模拟kafka数据:
> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> 00:00:00"}
> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> 00:00:00"}
>
> -- 操作步骤:
> 当发送第1条kafka数据得到如下输出:
> | OP| user_id| event_type | current_ts| bl_count |
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> 当再次发送第1条kafka数据得到如下输出:
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
>
> — SQL 如下:
>
> create table kafka_user_event
> (
> `user_id` BIGINT,
> `event_type` STRING,
> `current_ts` timestamp(3),
> `proc_time` AS PROCTIME()
> ) WITH (
> 'connector' = 'kafka',
> ...
> );
>
> create table mysql_user_blacklist
> (
> user_id BIGINT,
> create_time timestamp(3),
> primary key (user_id,create_time) not enforced
> ) WITH (
> 'connector' = 'jdbc',
> …
> );
>
> create view v2_user_event as (
> select t1.`user_id`
> , t1.`event_type`
> , t1.`current_ts`
> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> from kafka_user_event t1
> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2
> on t1.`user_id` = t2.`user_id`
> where t1.`event_type` = 'LOGIN'
> );
>
> select * from v2_user_event;
>
>