You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈佳豪 <ja...@yeah.net> on 2023/02/24 06:19:54 UTC
使用flink sql 将kafka的数据同步到mysql无法删除。
-----建表语法如下
String kafka = "CREATE TABLE `电话` " +
"( `rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机` VARCHAR(255), " +
" PRIMARY KEY (`rowID`) NOT ENFORCED ) " +
" WITH " +
"( 'connector' = 'jdbc', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'url' = 'jdbc:mysql://XXXXXX:6506/meihua_test', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'table-name' = '电话' )";
String mysql = "CREATE TABLE `电话_1` " +
"( `rowid` VARCHAR(100)," +
"`63f73b332e77497da91286f0` VARCHAR(100)," +
"`63f73b3f2e77497da91286fb` VARCHAR(100)," +
"`63f73b3f2e77497da91286fc` VARCHAR(100)," +
"`op` STRING ," +
" PRIMARY KEY (rowid) NOT ENFORCED )" +
" WITH " +
"( 'connector' = 'kafka', " +
"'topic' = 'sz_worksheet-63f82984f3ec743e45b0d561-63f73b332e77497da91286ef'," +
" 'properties.bootstrap.servers' = 'XXXXXX:9092'," +
" 'scan.startup.mode' = 'earliest-offset', " +
"'format' = 'debezium-json' )";
-----执行语句如下
String insert = "insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
" ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` from `电话_1` ) as t_1";
-----操作数据如下
String insert = "insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
" ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` from `电话_1` ) as t_1";
-----执行语句如下
{
"op":"d",
"before":{
"rowid":"f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d"
},
"after":null
}
现在的结论是可以新增和修改,但是无法删除。难道insert into这个语句搞不定吗? 走的debezuim json序列化的格式。
各位大佬帮看下 谢谢。
Re: 使用flink sql 将kafka的数据同步到mysql无法删除。
Posted by Jane Chan <qi...@gmail.com>.
Hi,
原问题中 String 变量 kafka 和 mysql 赋值反了, 以及能提供下所使用的 flink 版本吗, 我使用 1.16.1 没有复现此问题
payload
{
"before": {
"rowid": "f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d",
"63f73b332e77497da91286f0": "Jerry",
"63f73b3f2e77497da91286fb": "mobile number",
"63f73b3f2e77497da91286fc": "telephone number"
},
"after": null,
"source": {...},
"op": "d",
"ts_ms": 1677342340042,
"transaction": null
}
flink sql
Flink SQL> insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as
`名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `手机` from (select `rowid` as `rowID`,
`63f73b332e77497da91286f0` as `名称`,`63f73b3f2e77497da91286fb` as `手机`,
`63f73b3f2e77497da91286fc` as `座机` from `电话_1`) as t_1;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8490c9530d3a97e73aeedfe9745f2fe3
mysql output
mysql> select * from 电话;
+
--------------------------------------+--------+---------------+------------------+
| rowID | 名称 | 手机 | 座机
|
+
--------------------------------------+--------+---------------+------------------+
| f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Tom | mobile number | telephone
number |
+
--------------------------------------+--------+---------------+------------------+
1 row in set (0.00 sec)
mysql> select * from 电话;
+
--------------------------------------+--------+---------------+------------------+
| rowID | 名称 | 手机 | 座机
|
+
--------------------------------------+--------+---------------+------------------+
| f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Jerry | mobile number | telephone
number |
+
--------------------------------------+--------+---------------+------------------+
1 row in set (0.00 sec)
mysql> select * from 电话;
Empty set (0.00 sec)
Best,
Jane
On Fri, Feb 24, 2023 at 2:21 PM 陈佳豪 <ja...@yeah.net> wrote:
> -----建表语法如下
> String kafka = "CREATE TABLE `电话` " +
> "( `rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机`
> VARCHAR(255), " +
> " PRIMARY KEY (`rowID`) NOT ENFORCED ) " +
> " WITH " +
> "( 'connector' = 'jdbc', " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
> " 'url' = 'jdbc:mysql://XXXXXX:6506/meihua_test', " +
> " 'username' = 'root', " +
> " 'password' = '123456', " +
> " 'table-name' = '电话' )";
>
> String mysql = "CREATE TABLE `电话_1` " +
> "( `rowid` VARCHAR(100)," +
> "`63f73b332e77497da91286f0` VARCHAR(100)," +
> "`63f73b3f2e77497da91286fb` VARCHAR(100)," +
> "`63f73b3f2e77497da91286fc` VARCHAR(100)," +
> "`op` STRING ," +
> " PRIMARY KEY (rowid) NOT ENFORCED )" +
> " WITH " +
> "( 'connector' = 'kafka', " +
> "'topic' =
> 'sz_worksheet-63f82984f3ec743e45b0d561-63f73b332e77497da91286ef'," +
> " 'properties.bootstrap.servers' = 'XXXXXX:9092'," +
> " 'scan.startup.mode' = 'earliest-offset', " +
> "'format' = 'debezium-json' )";
> -----执行语句如下
> String insert = "insert into `电话` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
> " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as
> `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机`
> from `电话_1` ) as t_1";
> -----操作数据如下
>
>
> String insert = "insert into `电话` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
> " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as
> `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机`
> from `电话_1` ) as t_1";
> -----执行语句如下
> {
> "op":"d",
> "before":{
> "rowid":"f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d"
> },
> "after":null
> }
> 现在的结论是可以新增和修改,但是无法删除。难道insert into这个语句搞不定吗? 走的debezuim json序列化的格式。
> 各位大佬帮看下 谢谢。