You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 鱼子酱 <38...@qq.com> on 2020/11/16 10:04:23 UTC
flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效
请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
是目前不支持还是我使用的方法不对呢?
版本:flink 1.11.1
关键的2个sql如下
create table open_and_close_terminal_minute_1 (
request_date varchar
,terminal_no varchar
,logon_time varchar
,logout_time varchar
,insert_time varchar
,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED
) with (
'connector' = 'jd……
'url' = 'jdbc:mys……se',
'table-name' = 'c……,
'driver' = 'com.m……
'username' = 'ana……
'password' = 'ana……
'sink.buffer-flus……
)
upsert into open_and_close_terminal_minute_1
select request_date ,terminal_no ,logon_time ,logout_time
,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from
( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date
,cast(terminalNo as varchar) as terminal_no
,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time
,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time
from caslog INNER join itoa_b_terminal_shop for system_time as of
caslog.proc_time
on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey
where
errCode=0 and attr=0
group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo
)
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效
Posted by Leonard Xu <xb...@gmail.com>.
Hi,
你确定是在Flink SQL 里使用 upsert 语法? 我理解是不支持的
另外你flink里声明connector DDL 中的主键应该和你在Mysql表的主键一致。
祝好
Leonard
> 在 2020年11月17日,09:12,鱼子酱 <38...@qq.com> 写道:
>
> upsert
回复: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效
Posted by 史 正超 <sh...@outlook.com>.
你的sql里用的是 Tumble窗口,不是一个回撤流,不会有更新的,只有insert
________________________________
发件人: 鱼子酱 <38...@qq.com>
发送时间: 2020年11月17日 1:12
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效
我写的是upsert呀。。。
insert into 我也测试了,也不行。
是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效
Posted by hailongwang <18...@163.com>.
Hello,
我使用 MySQLDialect 在本地确认了下,
1. 在数据库需要建主键,因为建了主键 “INSERT INTO ... ON DUPLICATE KEY UPDATE”[1] 语句的 upsert 语义才会生效。
2. 需要在 DDL 中定义 'PRIMARY KEY',因为需要根据 ‘PRIMARY KEY’ 确认是否使用 'upsert query' [2]
[1] https://github.com/apache/flink/blob/7eb514a59f6fd117c3535ec4bebc40a375f30b63/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/MySQLDialect.java#L76
[2] https://github.com/apache/flink/blob/7eb514a59f6fd117c3535ec4bebc40a375f30b63/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatBuilder.java#L98
在 2020-11-17 09:28:14,"Tio Planto" <zo...@gmail.com> 写道:
>需要建立mysql表request_date,terminal_no的联合主键,mysql的upsert是基于"INSERT INTO ... ON
>DUPLICATE KEY UPDATE..."实现的。
>ddl中可以不声明mysql主健。
>
>鱼子酱 <38...@qq.com>于2020年11月17日 周二09:13写道:
>
>> 我写的是upsert呀。。。
>> insert into 我也测试了,也不行。
>>
>> 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效
Posted by Tio Planto <zo...@gmail.com>.
需要建立mysql表request_date,terminal_no的联合主键,mysql的upsert是基于"INSERT INTO ... ON
DUPLICATE KEY UPDATE..."实现的。
ddl中可以不声明mysql主健。
鱼子酱 <38...@qq.com>于2020年11月17日 周二09:13写道:
> 我写的是upsert呀。。。
> insert into 我也测试了,也不行。
>
> 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效
Posted by 鱼子酱 <38...@qq.com>.
我写的是upsert呀。。。
insert into 我也测试了,也不行。
是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效
Posted by hailongwang <18...@163.com>.
Hi,
这个版本是支持的。
其中插入语句是 "insert into " 而不是 “update into”?
在 2020-11-16 17:04:23,"鱼子酱" <38...@qq.com> 写道:
>请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
>是目前不支持还是我使用的方法不对呢?
>版本:flink 1.11.1
>
>关键的2个sql如下
>
> create table open_and_close_terminal_minute_1 (
> request_date varchar
> ,terminal_no varchar
> ,logon_time varchar
> ,logout_time varchar
> ,insert_time varchar
> ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED
> ) with (
> 'connector' = 'jd……
> 'url' = 'jdbc:mys……se',
> 'table-name' = 'c……,
> 'driver' = 'com.m……
> 'username' = 'ana……
> 'password' = 'ana……
> 'sink.buffer-flus……
> )
>
> upsert into open_and_close_terminal_minute_1
> select request_date ,terminal_no ,logon_time ,logout_time
>,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from
> ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date
> ,cast(terminalNo as varchar) as terminal_no
> ,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time
> ,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time
> from caslog INNER join itoa_b_terminal_shop for system_time as of
>caslog.proc_time
> on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey
> where
> errCode=0 and attr=0
> group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo
>
> )
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/