You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 史 正超 <sh...@outlook.com> on 2020/11/03 10:34:26 UTC
回复: TUMBLE函数不支持 回撤流
canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT UPDATE DELETE, 相关代码如下:
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
所以在window里消费带有update和delete的数据现在应该是不支持的。
________________________________
发件人: 夜思流年梦 <li...@163.com>
发送时间: 2020年11月3日 9:46
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: TUMBLE函数不支持 回撤流
这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
原sql
select 0 as id
, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h
,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h
from dwd_XXX
where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
group by TUMBLE(proctime ,interval '1' HOUR);
报错:
org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
发现把kafka建表语句改成 json格式就可以
数据源不是flink-mysql-cdc得来的
是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
'connector' = 'kafka',
'properties.group.id' = 'XX',
'properties.bootstrap.servers' = 'XX',
'topic' = 'ODS_XXX',
'scan.startup.mode' = 'group-offsets',
'format' = 'canal-json');
上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:
WITH (
'connector' = 'kafka',
'properties.group.id' = 'XX',
'properties.bootstrap.servers' = 'XXX',
'topic' = 'DWD_XXX',
'scan.startup.mode' = 'group-offsets',
'format' = 'changelog-json');
在 2020-10-30 14:53:09,"admin" <17...@163.com> 写道:
>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦 <li...@163.com> 写道:
>>
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select
>>
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>>
>>> ,sum(amt) as paymoney_h
>>
>>> from XXXX
>>
>>> group by TUMBLE(write_time,interval '1' HOUR);
>>
>>
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
>>
>>
>>
>>
>> 发现把kafka建表语句改成 json格式就可以
>>
>>
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>>
>>
>>
>>
>>
>>
>>
>>
>>
Re: TUMBLE函数不支持 回撤流
Posted by 赵一旦 <hi...@gmail.com>.
@LakeShen。 怎么看是append/retract数据流呢?是通过逻辑自己判定还是说有什么flink层面的信息直接反映。
LakeShen <sh...@gmail.com> 于2020年11月4日周三 上午10:12写道:
> Hi 夜思流年梦,
>
> 看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
> 如果是 retract ,应该就不能再上面进行窗口计算了。
>
> Best,
> LakeShen
>
> 史 正超 <sh...@outlook.com> 于2020年11月3日周二 下午6:34写道:
>
> > canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> > UPDATE DELETE, 相关代码如下:
> >
> > @Override
> > public ChangelogMode getChangelogMode() {
> > return ChangelogMode.newBuilder()
> > .addContainedKind(RowKind.INSERT)
> > .addContainedKind(RowKind.UPDATE_BEFORE)
> > .addContainedKind(RowKind.UPDATE_AFTER)
> > .addContainedKind(RowKind.DELETE)
> > .build();
> > }
> >
> > 所以在window里消费带有update和delete的数据现在应该是不支持的。
> > ________________________________
> > 发件人: 夜思流年梦 <li...@163.com>
> > 发送时间: 2020年11月3日 9:46
> > 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> > 主题: TUMBLE函数不支持 回撤流
> >
> >
> >
> >
> > 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
> >
> >
> >
> >
> >
> >
> >
> > 原sql
> >
> > select 0 as id
> >
> > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
> >
> > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
> > then memberid else NULL end) as paynum_h
> >
> > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> > 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h
> >
> > from dwd_XXX
> >
> > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
> >
> > group by TUMBLE(proctime ,interval '1' HOUR);
> >
> >
> > 报错:
> > org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > 发现把kafka建表语句改成 json格式就可以
> >
> >
> > 数据源不是flink-mysql-cdc得来的
> >
> >
> > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
> >
> >
> > 'connector' = 'kafka',
> > 'properties.group.id' = 'XX',
> > 'properties.bootstrap.servers' = 'XX',
> > 'topic' = 'ODS_XXX',
> > 'scan.startup.mode' = 'group-offsets',
> > 'format' = 'canal-json');
> >
> >
> > 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> > 建kafka表的格式,使用的changelog-json:
> >
> >
> > WITH (
> > 'connector' = 'kafka',
> > 'properties.group.id' = 'XX',
> > 'properties.bootstrap.servers' = 'XXX',
> > 'topic' = 'DWD_XXX',
> > 'scan.startup.mode' = 'group-offsets',
> > 'format' = 'changelog-json');
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-10-30 14:53:09,"admin" <17...@163.com> 写道:
> > >Hi,
> > >能贴一下完整的sql吗,数据源是CDC的数据吗?
> > >
> > >> 2020年10月30日 下午2:48,夜思流年梦 <li...@163.com> 写道:
> > >>
> > >> 开发者你好:
> > >> 现有此场景:
> > >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> > >> select
> > >>
> > >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> > >>
> > >>> ,sum(amt) as paymoney_h
> > >>
> > >>> from XXXX
> > >>
> > >>> group by TUMBLE(write_time,interval '1' HOUR);
> > >>
> > >>
> > >> 报错:
> > >> org.apache.flink.table.api.TableException: GroupWindowAggregate
> doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > >>
> > >>
> > >>
> > >>
> > >> 发现把kafka建表语句改成 json格式就可以
> > >>
> > >>
> > >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> >
> >
> >
> >
> >
> >
>
Re: TUMBLE函数不支持 回撤流
Posted by LakeShen <sh...@gmail.com>.
Hi 夜思流年梦,
看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
如果是 retract ,应该就不能再上面进行窗口计算了。
Best,
LakeShen
史 正超 <sh...@outlook.com> 于2020年11月3日周二 下午6:34写道:
> canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> UPDATE DELETE, 相关代码如下:
>
> @Override
> public ChangelogMode getChangelogMode() {
> return ChangelogMode.newBuilder()
> .addContainedKind(RowKind.INSERT)
> .addContainedKind(RowKind.UPDATE_BEFORE)
> .addContainedKind(RowKind.UPDATE_AFTER)
> .addContainedKind(RowKind.DELETE)
> .build();
> }
>
> 所以在window里消费带有update和delete的数据现在应该是不支持的。
> ________________________________
> 发件人: 夜思流年梦 <li...@163.com>
> 发送时间: 2020年11月3日 9:46
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: TUMBLE函数不支持 回撤流
>
>
>
>
> 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
>
>
>
>
>
>
>
> 原sql
>
> select 0 as id
>
> , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
>
> ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
> then memberid else NULL end) as paynum_h
>
> ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h
>
> from dwd_XXX
>
> where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
>
> group by TUMBLE(proctime ,interval '1' HOUR);
>
>
> 报错:
> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> 发现把kafka建表语句改成 json格式就可以
>
>
> 数据源不是flink-mysql-cdc得来的
>
>
> 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
>
>
> 'connector' = 'kafka',
> 'properties.group.id' = 'XX',
> 'properties.bootstrap.servers' = 'XX',
> 'topic' = 'ODS_XXX',
> 'scan.startup.mode' = 'group-offsets',
> 'format' = 'canal-json');
>
>
> 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> 建kafka表的格式,使用的changelog-json:
>
>
> WITH (
> 'connector' = 'kafka',
> 'properties.group.id' = 'XX',
> 'properties.bootstrap.servers' = 'XXX',
> 'topic' = 'DWD_XXX',
> 'scan.startup.mode' = 'group-offsets',
> 'format' = 'changelog-json');
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-30 14:53:09,"admin" <17...@163.com> 写道:
> >Hi,
> >能贴一下完整的sql吗,数据源是CDC的数据吗?
> >
> >> 2020年10月30日 下午2:48,夜思流年梦 <li...@163.com> 写道:
> >>
> >> 开发者你好:
> >> 现有此场景:
> >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> >> select
> >>
> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> >>
> >>> ,sum(amt) as paymoney_h
> >>
> >>> from XXXX
> >>
> >>> group by TUMBLE(write_time,interval '1' HOUR);
> >>
> >>
> >> 报错:
> >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> >>
> >>
> >>
> >>
> >> 发现把kafka建表语句改成 json格式就可以
> >>
> >>
> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>
>