You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by LittleFall <15...@qq.com> on 2020/09/14 03:37:07 UTC
Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失
Flink 版本:
flink:1.11.1-scala_2.12
连接器
mysql-connector-java-8.0.21
flink-sql-connector-kafka_2.12-1.11.1
flink-connector-jdbc_2.12-1.11.1
Flink SQL:
CREATE TABLE source_user_name (
loan_no int,
name varchar,
PRIMARY KEY (loan_no) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'test.username',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test_flink_name_group',
'format'='canal-json',
'scan.startup.mode' = 'group-offsets'
);
CREATE TABLE test_flink_name_sink (
loan_no int,
name varchar,
PRIMARY KEY (loan_no) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc',
'connector.url' =
'jdbc:mysql://host.docker.internal:3306/test?&rewriteBatchedStatements=true',
'connector.table' = 'username',
'connector.driver' = 'com.mysql.cj.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '',
'connector.write.flush.max-rows' = '5000',
'connector.write.flush.interval' = '1s'
);
insert into test_flink_name_sink (loan_no,name)
select loan_no,name from source_user_name;
外部 sql:
CREATE TABLE username (
loan_no int PRIMARY KEY,
name varchar(10)
);
insert into username values (1,'a');
架构是 mysql-canal-kafka-flink-mysql
同时执行(一次输入两行)
UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1;
UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1;
发现目标数据库中结果丢失,结果稳定复现。
分析原因:
```
上游一个update下游会落地两个sql
1.insert into after value
2.delete before value
而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch
如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a
这个时候就会触发问题
insert batch结束之后数据变成了id:1,name:a
再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了
第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了
```
换成新版 JDBC 配置之后没有这个问题。
请问这是已经发现的问题吗?有没有 issue 号
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失
Posted by Michael Ran <gr...@163.com>.
update 怎么触发的 delete 哦?
在 2020-09-14 11:37:07,"LittleFall" <15...@qq.com> 写道:
>Flink 版本:
>flink:1.11.1-scala_2.12
>连接器
>mysql-connector-java-8.0.21
>flink-sql-connector-kafka_2.12-1.11.1
>flink-connector-jdbc_2.12-1.11.1
>
>Flink SQL:
>
>CREATE TABLE source_user_name (
> loan_no int,
> name varchar,
> PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test.username',
> 'properties.bootstrap.servers' = 'kafka:9092',
> 'properties.group.id' = 'test_flink_name_group',
> 'format'='canal-json',
> 'scan.startup.mode' = 'group-offsets'
>);
>
>CREATE TABLE test_flink_name_sink (
> loan_no int,
> name varchar,
> PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' =
>'jdbc:mysql://host.docker.internal:3306/test?&rewriteBatchedStatements=true',
> 'connector.table' = 'username',
> 'connector.driver' = 'com.mysql.cj.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = '',
> 'connector.write.flush.max-rows' = '5000',
> 'connector.write.flush.interval' = '1s'
>);
>
>insert into test_flink_name_sink (loan_no,name)
>select loan_no,name from source_user_name;
>
>
>外部 sql:
>
>CREATE TABLE username (
> loan_no int PRIMARY KEY,
> name varchar(10)
>);
>
>insert into username values (1,'a');
>
>架构是 mysql-canal-kafka-flink-mysql
>
>同时执行(一次输入两行)
>
>UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1;
>UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1;
>
>发现目标数据库中结果丢失,结果稳定复现。
>
>分析原因:
>
>```
>上游一个update下游会落地两个sql
>1.insert into after value
>2.delete before value
>而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch
>
>如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a
>这个时候就会触发问题
>insert batch结束之后数据变成了id:1,name:a
>再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了
>第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了
>```
>
>换成新版 JDBC 配置之后没有这个问题。
>
>请问这是已经发现的问题吗?有没有 issue 号
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
flink1.8.1处理utc时间方式
Posted by "zjfplayer@hotmail.com" <zj...@hotmail.com>.
请问下各位,flink1.8.1,flink sql(非java代码转化)方式下,数据格式'2018-1-2T12:00Z'如何sink到oracle的date字段,to_timestamp函数在1.8中还没有
Re:Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失
Posted by Michael Ran <gr...@163.com>.
update 怎么触发的 delete 哦?
在 2020-09-14 11:37:07,"LittleFall" <15...@qq.com> 写道:
>Flink 版本:
>flink:1.11.1-scala_2.12
>连接器
>mysql-connector-java-8.0.21
>flink-sql-connector-kafka_2.12-1.11.1
>flink-connector-jdbc_2.12-1.11.1
>
>Flink SQL:
>
>CREATE TABLE source_user_name (
> loan_no int,
> name varchar,
> PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test.username',
> 'properties.bootstrap.servers' = 'kafka:9092',
> 'properties.group.id' = 'test_flink_name_group',
> 'format'='canal-json',
> 'scan.startup.mode' = 'group-offsets'
>);
>
>CREATE TABLE test_flink_name_sink (
> loan_no int,
> name varchar,
> PRIMARY KEY (loan_no) NOT ENFORCED
>) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' =
>'jdbc:mysql://host.docker.internal:3306/test?&rewriteBatchedStatements=true',
> 'connector.table' = 'username',
> 'connector.driver' = 'com.mysql.cj.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = '',
> 'connector.write.flush.max-rows' = '5000',
> 'connector.write.flush.interval' = '1s'
>);
>
>insert into test_flink_name_sink (loan_no,name)
>select loan_no,name from source_user_name;
>
>
>外部 sql:
>
>CREATE TABLE username (
> loan_no int PRIMARY KEY,
> name varchar(10)
>);
>
>insert into username values (1,'a');
>
>架构是 mysql-canal-kafka-flink-mysql
>
>同时执行(一次输入两行)
>
>UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1;
>UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1;
>
>发现目标数据库中结果丢失,结果稳定复现。
>
>分析原因:
>
>```
>上游一个update下游会落地两个sql
>1.insert into after value
>2.delete before value
>而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch
>
>如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a
>这个时候就会触发问题
>insert batch结束之后数据变成了id:1,name:a
>再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了
>第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了
>```
>
>换成新版 JDBC 配置之后没有这个问题。
>
>请问这是已经发现的问题吗?有没有 issue 号
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失
Posted by Michael Ran <gr...@163.com>.
有主键吗? 有的话不会触发delete 才对
在 2020-09-28 15:54:49,"Leonard Xu" <xb...@gmail.com> 写道:
>
>
>> 在 2020年9月15日,16:52,LittleFall <15...@qq.com> 写道:
>>
>> 谢谢,请问有相关的 issue 链接吗
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>To @LItteFall :
>
>没有对应的issue,因为是在修复changlog issue[1]时在这个issue里一起修复的,代码可以看下TableBufferReducedStatementExecutor里reduceBuffer保证是对同key上得不同操作顺序执行的。
>
>To @Michael Ran:
>
>update 怎么触发的 delete 哦?
>
>LItteFall 是在数据库的表中触发了update操作,然后数据库的binlog通过 CDC工具 canal 以 canal-json 格式写入到kafka的表中,一个update 会对应UPDATE_BEFORE,UPDATE_AFTER两条数据, JDBC connector 对应的处理会生成两条sql, 一条delete和一条insert.
>
>
>祝好
>Leonard
>[1]https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues <https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues>
Re: Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失
Posted by Leonard Xu <xb...@gmail.com>.
> 在 2020年9月15日,16:52,LittleFall <15...@qq.com> 写道:
>
> 谢谢,请问有相关的 issue 链接吗
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
To @LItteFall :
没有对应的issue,因为是在修复changlog issue[1]时在这个issue里一起修复的,代码可以看下TableBufferReducedStatementExecutor里reduceBuffer保证是对同key上得不同操作顺序执行的。
To @Michael Ran:
update 怎么触发的 delete 哦?
LItteFall 是在数据库的表中触发了update操作,然后数据库的binlog通过 CDC工具 canal 以 canal-json 格式写入到kafka的表中,一个update 会对应UPDATE_BEFORE,UPDATE_AFTER两条数据, JDBC connector 对应的处理会生成两条sql, 一条delete和一条insert.
祝好
Leonard
[1]https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues <https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues>
Re: Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失
Posted by LittleFall <15...@qq.com>.
谢谢,请问有相关的 issue 链接吗
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失
Posted by Jark Wu <im...@gmail.com>.
Hi,
这个是我们在实现新版 JDBC sink 的时候发现的问题,所以新版中修复了这个问题。 由于旧版 JDBC
修复起来比较麻烦,可能导致不兼容,且旧版在不久的将来会移除,所以目前没有去修复。
Best,
Jark
On Mon, 14 Sep 2020 at 11:38, LittleFall <15...@qq.com> wrote:
> Flink 版本:
> flink:1.11.1-scala_2.12
> 连接器
> mysql-connector-java-8.0.21
> flink-sql-connector-kafka_2.12-1.11.1
> flink-connector-jdbc_2.12-1.11.1
>
> Flink SQL:
>
> CREATE TABLE source_user_name (
> loan_no int,
> name varchar,
> PRIMARY KEY (loan_no) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test.username',
> 'properties.bootstrap.servers' = 'kafka:9092',
> 'properties.group.id' = 'test_flink_name_group',
> 'format'='canal-json',
> 'scan.startup.mode' = 'group-offsets'
> );
>
> CREATE TABLE test_flink_name_sink (
> loan_no int,
> name varchar,
> PRIMARY KEY (loan_no) NOT ENFORCED
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' =
>
> 'jdbc:mysql://host.docker.internal:3306/test?&rewriteBatchedStatements=true',
> 'connector.table' = 'username',
> 'connector.driver' = 'com.mysql.cj.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = '',
> 'connector.write.flush.max-rows' = '5000',
> 'connector.write.flush.interval' = '1s'
> );
>
> insert into test_flink_name_sink (loan_no,name)
> select loan_no,name from source_user_name;
>
>
> 外部 sql:
>
> CREATE TABLE username (
> loan_no int PRIMARY KEY,
> name varchar(10)
> );
>
> insert into username values (1,'a');
>
> 架构是 mysql-canal-kafka-flink-mysql
>
> 同时执行(一次输入两行)
>
> UPDATE `username` SET `name` = 'b' WHERE `loan_no` = 1;
> UPDATE `username` SET `name` = 'a' WHERE `loan_no` = 1;
>
> 发现目标数据库中结果丢失,结果稳定复现。
>
> 分析原因:
>
> ```
> 上游一个update下游会落地两个sql
> 1.insert into after value
> 2.delete before value
> 而且insert和delete是分开两个statement batch提交的。先insert batch再delete batch
>
> 如果上游同时有两个update,update逻辑id:1,name:a更新为id:1,name:b再更新为id:1,name:a
> 这个时候就会触发问题
> insert batch结束之后数据变成了id:1,name:a
> 再执行delete batch的第一条before delete:delete id=1 name=a会直接把最终正确的数据删除了
> 第二条的before delete:delete id=1 name=b删除不到数据。因为数据已经被删除了
> ```
>
> 换成新版 JDBC 配置之后没有这个问题。
>
> 请问这是已经发现的问题吗?有没有 issue 号
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>