You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by xu yihan <my...@126.com> on 2020/06/04 09:59:49 UTC
flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
举个例子比如我想要
insert into mysql_sink
select
ID,
amount,
………
from source
groupby ID;
这里就是想按照id为key,在数据库里更新这个id对应的amount等其他值。
但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项,必须在groupby后面加上所有select后面的项。
但这样带来一个问题,这样子key的state无限增长(比如说amount是一个随机的double数),job跑不久就会fail掉。
请问大家有什么办法能只指定部分字段为key来规避掉这个问题,谢谢。
之前发过一封类似的邮件,没有回复,只好再发一封,见谅。
发自我的iPhone
Re: flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
Posted by godfrey he <go...@gmail.com>.
hi yihan,
如 Leonard 所说,你可以考虑使用 first_value, last_value 等聚合函数和赛选其他字段。
1.11开始支持ddl定义pk信息, 如果id在source表中也是pk字段,可以直接定义,
planner会利用该信息传递pk到sink表。
Bests,
Godfrey
Leonard Xu <xb...@gmail.com> 于2020年6月4日周四 下午9:01写道:
> Hi,
>
> > 但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项,
>
> select后费聚合值可以通过max()或sum()来取,因为已经按照key group by了,所以取出来的非聚合值只能有一条,
>
> > 这样子key的state无限增长(比如说amount是一个随机的double数),job跑不久就会fail掉。
>
> State 可以配置ttl的,过期清理参考[1]
>
> 另外,即将发布的1.11中,支持在jdbc table 上定义primary key, 不用强制要求写upsert 的query,文档正在撰写中[2]
>
> Best,
> Leonard Xu
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
> >
> [2] https://issues.apache.org/jira/browse/FLINK-17829 <
> https://issues.apache.org/jira/browse/FLINK-17829>
>
回复: flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
Posted by 1048262223 <10...@qq.com>.
Hi
这里存在一个问题是,使用了last_value或者first_value这样的udaf,但是如果多条数据来到经过udaf处理后结果还是和之前一样的情况下,是不会产出回撤流数据的,可以观察下你是否需要考虑这种情况。
Best,
Yichao Yang
------------------ 原始邮件 ------------------
发件人: "x2009438"<x2009438@126.com>;
发送时间: 2020年6月5日(星期五) 上午9:06
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
感谢各位,
我先试试用Last_value这样的aggregate function绕过去。
@kcz
可能我表达不很清楚,具体描述一下遇到的具体场景就是:收到原始的数据,去mysql或者es里做维表关联,然后再以upsert的模式将结果写回mysql或es。
举个例子来说,我想按id为key更新整行数据(比如还有个字段amount是个随机的double类型值)
select
id,
amount,
……
groupby id;
这样子不行,必须
select
id,
amount,
…
groupby id,amount,……;
这样子,假设id是固定有限个,那么state个数应该也是有限的,但是加上amount,我理解state的状态就会+♾了。
也就是说我实际并不需要分组聚合,保存状态,用groupby只是为了让connector识别到keyfields。
发自我的iPhone
> 在 2020年6月5日,08:46,kcz <573693104@qq.com> 写道:
>
> 我大概get到你要说的需求,select那些其实是明细数据?但是没有跟聚合的数据拆开,所以才出现这种情况吧?
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: Leonard Xu <xbjtdcq@gmail.com&gt;
> 发送时间: 2020年6月4日 21:01
> 收件人: user-zh <user-zh@flink.apache.org&gt;
> 主题: 回复:flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
>
>
>
> Hi,
>
> &gt; 但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项,
>
> select后费聚合值可以通过max()或sum()来取,因为已经按照key group by了,所以取出来的非聚合值只能有一条,
>
> &gt; 这样子key的state无限增长(比如说amount是一个随机的double数),job跑不久就会fail掉。
>
> State 可以配置ttl的,过期清理参考[1]
>
> 另外,即将发布的1.11中,支持在jdbc table 上定义primary key, 不用强制要求写upsert 的query,文档正在撰写中[2]
>
> Best,
> Leonard Xu
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time&gt;
> [2] https://issues.apache.org/jira/browse/FLINK-17829 <https://issues.apache.org/jira/browse/FLINK-17829&gt;
Re: flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
Posted by x2009438 <x2...@126.com>.
感谢各位,
我先试试用Last_value这样的aggregate function绕过去。
@kcz
可能我表达不很清楚,具体描述一下遇到的具体场景就是:收到原始的数据,去mysql或者es里做维表关联,然后再以upsert的模式将结果写回mysql或es。
举个例子来说,我想按id为key更新整行数据(比如还有个字段amount是个随机的double类型值)
select
id,
amount,
……
groupby id;
这样子不行,必须
select
id,
amount,
…
groupby id,amount,……;
这样子,假设id是固定有限个,那么state个数应该也是有限的,但是加上amount,我理解state的状态就会+♾了。
也就是说我实际并不需要分组聚合,保存状态,用groupby只是为了让connector识别到keyfields。
发自我的iPhone
> 在 2020年6月5日,08:46,kcz <57...@qq.com> 写道:
>
> 我大概get到你要说的需求,select那些其实是明细数据?但是没有跟聚合的数据拆开,所以才出现这种情况吧?
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: Leonard Xu <xbjtdcq@gmail.com>
> 发送时间: 2020年6月4日 21:01
> 收件人: user-zh <user-zh@flink.apache.org>
> 主题: 回复:flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
>
>
>
> Hi,
>
> > 但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项,
>
> select后费聚合值可以通过max()或sum()来取,因为已经按照key group by了,所以取出来的非聚合值只能有一条,
>
> > 这样子key的state无限增长(比如说amount是一个随机的double数),job跑不久就会fail掉。
>
> State 可以配置ttl的,过期清理参考[1]
>
> 另外,即将发布的1.11中,支持在jdbc table 上定义primary key, 不用强制要求写upsert 的query,文档正在撰写中[2]
>
> Best,
> Leonard Xu
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time>
> [2] https://issues.apache.org/jira/browse/FLINK-17829 <https://issues.apache.org/jira/browse/FLINK-17829>
回复:flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
Posted by kcz <57...@qq.com>.
我大概get到你要说的需求,select那些其实是明细数据?但是没有跟聚合的数据拆开,所以才出现这种情况吧?
------------------ 原始邮件 ------------------
发件人: Leonard Xu <xbjtdcq@gmail.com>
发送时间: 2020年6月4日 21:01
收件人: user-zh <user-zh@flink.apache.org>
主题: 回复:flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
Hi,
> 但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项,
select后费聚合值可以通过max()或sum()来取,因为已经按照key group by了,所以取出来的非聚合值只能有一条,
> 这样子key的state无限增长(比如说amount是一个随机的double数),job跑不久就会fail掉。
State 可以配置ttl的,过期清理参考[1]
另外,即将发布的1.11中,支持在jdbc table 上定义primary key, 不用强制要求写upsert 的query,文档正在撰写中[2]
Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time>
[2] https://issues.apache.org/jira/browse/FLINK-17829 <https://issues.apache.org/jira/browse/FLINK-17829>
Re: flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
Posted by Leonard Xu <xb...@gmail.com>.
Hi,
> 但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项,
select后费聚合值可以通过max()或sum()来取,因为已经按照key group by了,所以取出来的非聚合值只能有一条,
> 这样子key的state无限增长(比如说amount是一个随机的double数),job跑不久就会fail掉。
State 可以配置ttl的,过期清理参考[1]
另外,即将发布的1.11中,支持在jdbc table 上定义primary key, 不用强制要求写upsert 的query,文档正在撰写中[2]
Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time>
[2] https://issues.apache.org/jira/browse/FLINK-17829 <https://issues.apache.org/jira/browse/FLINK-17829>
回复:flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
Posted by 1048262223 <10...@qq.com>.
Hi
非活动key的状态是可清理的,可参考
https://blog.csdn.net/lp284558195/article/details/104609739
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#:~:text=The%20minimum%20idle%20state%20retention,kept%20before%20it%20is%20removed.
Best,
Yichao Yang
------------------ 原始邮件 ------------------
发件人: "xu yihan"<mytardis@126.com>;
发送时间: 2020年6月4日(星期四) 下午5:59
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
举个例子比如我想要
insert into mysql_sink
select
ID,
amount,
………
from source
groupby ID;
这里就是想按照id为key,在数据库里更新这个id对应的amount等其他值。
但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项,必须在groupby后面加上所有select后面的项。
但这样带来一个问题,这样子key的state无限增长(比如说amount是一个随机的double数),job跑不久就会fail掉。
请问大家有什么办法能只指定部分字段为key来规避掉这个问题,谢谢。
之前发过一封类似的邮件,没有回复,只好再发一封,见谅。
发自我的iPhone