You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 史 正超 <sh...@outlook.com> on 2020/11/03 10:34:26 UTC

回复: TUMBLE函数不支持 回撤流

canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT UPDATE DELETE, 相关代码如下:

@Override
public ChangelogMode getChangelogMode() {
   return ChangelogMode.newBuilder()
      .addContainedKind(RowKind.INSERT)
      .addContainedKind(RowKind.UPDATE_BEFORE)
      .addContainedKind(RowKind.UPDATE_AFTER)
      .addContainedKind(RowKind.DELETE)
      .build();
}

所以在window里消费带有update和delete的数据现在应该是不支持的。
________________________________
发件人: 夜思流年梦 <li...@163.com>
发送时间: 2020年11月3日 9:46
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: TUMBLE函数不支持 回撤流




这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;







原sql

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')  then real_product else 0 end)) as paymoney_h

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


报错:
 org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
发现把kafka建表语句改成 json格式就可以


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');











在 2020-10-30 14:53:09,"admin" <17...@163.com> 写道:
>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦 <li...@163.com> 写道:
>>
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select
>>
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>>
>>> ,sum(amt) as paymoney_h
>>
>>> from XXXX
>>
>>> group by TUMBLE(write_time,interval '1' HOUR);
>>
>>
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
>>
>>
>>
>>
>> 发现把kafka建表语句改成 json格式就可以
>>
>>
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>>
>>
>>
>>
>>
>>
>>
>>
>>







Re: TUMBLE函数不支持 回撤流

Posted by 赵一旦 <hi...@gmail.com>.
@LakeShen。 怎么看是append/retract数据流呢?是通过逻辑自己判定还是说有什么flink层面的信息直接反映。

LakeShen <sh...@gmail.com> 于2020年11月4日周三 上午10:12写道:

> Hi 夜思流年梦,
>
>     看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
>     如果是 retract ,应该就不能再上面进行窗口计算了。
>
> Best,
> LakeShen
>
> 史 正超 <sh...@outlook.com> 于2020年11月3日周二 下午6:34写道:
>
> > canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> > UPDATE DELETE, 相关代码如下:
> >
> > @Override
> > public ChangelogMode getChangelogMode() {
> >    return ChangelogMode.newBuilder()
> >       .addContainedKind(RowKind.INSERT)
> >       .addContainedKind(RowKind.UPDATE_BEFORE)
> >       .addContainedKind(RowKind.UPDATE_AFTER)
> >       .addContainedKind(RowKind.DELETE)
> >       .build();
> > }
> >
> > 所以在window里消费带有update和delete的数据现在应该是不支持的。
> > ________________________________
> > 发件人: 夜思流年梦 <li...@163.com>
> > 发送时间: 2020年11月3日 9:46
> > 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> > 主题: TUMBLE函数不支持 回撤流
> >
> >
> >
> >
> > 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
> >
> >
> >
> >
> >
> >
> >
> > 原sql
> >
> > select 0 as id
> >
> > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
> >
> > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
> > then memberid else NULL end) as paynum_h
> >
> > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> > 'yyyy-MM-dd')  then real_product else 0 end)) as paymoney_h
> >
> > from dwd_XXX
> >
> > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
> >
> > group by TUMBLE(proctime ,interval '1' HOUR);
> >
> >
> > 报错:
> >  org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > 发现把kafka建表语句改成 json格式就可以
> >
> >
> > 数据源不是flink-mysql-cdc得来的
> >
> >
> > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
> >
> >
> >  'connector' = 'kafka',
> >   'properties.group.id' = 'XX',
> >   'properties.bootstrap.servers' = 'XX',
> >   'topic' = 'ODS_XXX',
> >   'scan.startup.mode' = 'group-offsets',
> >   'format' = 'canal-json');
> >
> >
> > 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> > 建kafka表的格式,使用的changelog-json:
> >
> >
> > WITH (
> >   'connector' = 'kafka',
> >   'properties.group.id' = 'XX',
> >   'properties.bootstrap.servers' = 'XXX',
> >   'topic' = 'DWD_XXX',
> >   'scan.startup.mode' = 'group-offsets',
> >   'format' = 'changelog-json');
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-10-30 14:53:09,"admin" <17...@163.com> 写道:
> > >Hi,
> > >能贴一下完整的sql吗,数据源是CDC的数据吗?
> > >
> > >> 2020年10月30日 下午2:48,夜思流年梦 <li...@163.com> 写道:
> > >>
> > >> 开发者你好:
> > >> 现有此场景:
> > >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> > >> select
> > >>
> > >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> > >>
> > >>> ,sum(amt) as paymoney_h
> > >>
> > >>> from XXXX
> > >>
> > >>> group by TUMBLE(write_time,interval '1' HOUR);
> > >>
> > >>
> > >> 报错:
> > >> org.apache.flink.table.api.TableException: GroupWindowAggregate
> doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > >>
> > >>
> > >>
> > >>
> > >> 发现把kafka建表语句改成 json格式就可以
> > >>
> > >>
> > >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> >
> >
> >
> >
> >
> >
>

Re: TUMBLE函数不支持 回撤流

Posted by LakeShen <sh...@gmail.com>.
Hi 夜思流年梦,

    看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
    如果是 retract ,应该就不能再上面进行窗口计算了。

Best,
LakeShen

史 正超 <sh...@outlook.com> 于2020年11月3日周二 下午6:34写道:

> canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> UPDATE DELETE, 相关代码如下:
>
> @Override
> public ChangelogMode getChangelogMode() {
>    return ChangelogMode.newBuilder()
>       .addContainedKind(RowKind.INSERT)
>       .addContainedKind(RowKind.UPDATE_BEFORE)
>       .addContainedKind(RowKind.UPDATE_AFTER)
>       .addContainedKind(RowKind.DELETE)
>       .build();
> }
>
> 所以在window里消费带有update和delete的数据现在应该是不支持的。
> ________________________________
> 发件人: 夜思流年梦 <li...@163.com>
> 发送时间: 2020年11月3日 9:46
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: TUMBLE函数不支持 回撤流
>
>
>
>
> 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
>
>
>
>
>
>
>
> 原sql
>
> select 0 as id
>
> , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
>
> ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
> then memberid else NULL end) as paynum_h
>
> ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> 'yyyy-MM-dd')  then real_product else 0 end)) as paymoney_h
>
> from dwd_XXX
>
> where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
>
> group by TUMBLE(proctime ,interval '1' HOUR);
>
>
> 报错:
>  org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> 发现把kafka建表语句改成 json格式就可以
>
>
> 数据源不是flink-mysql-cdc得来的
>
>
> 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
>
>
>  'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XX',
>   'topic' = 'ODS_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'canal-json');
>
>
> 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> 建kafka表的格式,使用的changelog-json:
>
>
> WITH (
>   'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XXX',
>   'topic' = 'DWD_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'changelog-json');
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-30 14:53:09,"admin" <17...@163.com> 写道:
> >Hi,
> >能贴一下完整的sql吗,数据源是CDC的数据吗?
> >
> >> 2020年10月30日 下午2:48,夜思流年梦 <li...@163.com> 写道:
> >>
> >> 开发者你好:
> >> 现有此场景:
> >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> >> select
> >>
> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> >>
> >>> ,sum(amt) as paymoney_h
> >>
> >>> from XXXX
> >>
> >>> group by TUMBLE(write_time,interval '1' HOUR);
> >>
> >>
> >> 报错:
> >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> >>
> >>
> >>
> >>
> >> 发现把kafka建表语句改成 json格式就可以
> >>
> >>
> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>
>