You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈佳豪 <ja...@yeah.net> on 2023/03/01 10:14:35 UTC

使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
String kafka = "CREATE TABLE `电话` (    `rowid` VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` VARCHAR(2147483647),`63fd660536521f81a2cfabad` VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH ( 'connector' = 'kafka', 'topic' = 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', 'properties.bootstrap.servers' = '132.232.27.116:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";

String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称` STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' = 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://43.136.128.102:6506/meihua_test',    'username' = 'root',    'password' = '123456',    'table-name' = '电话2'  )";

String insert = "insert into `电话_1` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机` from `电话` ) as t_1";

操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?

Re: Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

Posted by Jane Chan <qi...@gmail.com>.
从 plan 上看起来在 sink 节点这里因为推导不出 upsert key 加上了 SinkUpsertMaterializer[1],
这里会按照 sink 表定义的主键进行 keyby shuffle[2], 只能保证最终一致性.
另外你的操作描述中 schema 为三列, 但 DDL 是四列, 且格式乱了.

一些可能的建议如下

1. 如果上游数据有主键并且也是 rowid 的话, 建议在 Flink source 表上声明 PK, 避免额外生成 materializer
节点; 同时注意在声明 Flink source 表时不要带上 metadata 列 (比如 op), 这会导致非确定性更新[3].
2. 检查写入 MySQL 数据库中的物理表 PK 字段是否和 Flink SQL sink 表的 PK 字段保持一致.

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
[2]
https://github.com/apache/flink/blob/3ea83baad0c8413f8e1f4a027866335d13789538/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L378
[3]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/determinism/#31-%e6%b5%81%e4%b8%8a%e7%9a%84%e4%b8%8d%e7%a1%ae%e5%ae%9a%e6%80%a7

Best,
Jane

On Mon, Mar 6, 2023 at 11:24 AM 陈佳豪 <ja...@yeah.net> wrote:

> 刚做了一下测试
> 目前假定有3行数据需要同步(全量):
> | 编号 |
> 电话
> |
> 座机
> |
> | 1 |
> 13113133333
> |
> 123
> |
> | 2 |
> 13113133333
> |
> 456
> |
> | 3 |
> 13113133333
> |
> 789
> |
>
>
>
>
> 这个时候我修改第四行数据的两个字段(增量):
> | 1
>
>
> |
> 电话
> |
> 座机
> |
> | 1 |
> 13113133333
> |
> 123
> |
> | 2 |
> 13113133333
> |
> 456
> |
> | 3 |
> 13113133110
> |
> 888
> |
> 修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确).
> 然后我继续删除数据3这个时候就不对了,在flink里面有修改两次的缓存数据,所以删除的同时将原来的旧数据插入进了mysql中(操作错误).
>
> 上述是我基于flink1.16.1版本进行测试的结果,目前不知道是不是要配置flink还是下游算子具体配置什么也不是清楚。这个问题困扰有3周了,各种测试调整都没有起作用。
>
>
>
>
>
>
>
>
> 在 2023-03-06 10:54:23,"陈佳豪" <ja...@yeah.net> 写道:
> >hi 早上好
>
> >我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下
> >
> >== Abstract Syntax Tree ==
> >LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID,
> 名称, 手机, 座机])
> >+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"],
> 名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
> 座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"])
> >   +- LogicalTableScan(table=[[default_catalog, default_database, 电话]])
> >
> >
> >== Optimized Physical Plan ==
> >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称,
> 手机, 座机], upsertMaterialize=[true])
> >+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS
> rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机,
> CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS
> 座机])
> >   +- TableSourceScan(table=[[default_catalog, default_database, 电话]],
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad,
> 63fd660536521f81a2cfabae])
> >
> >
> >== Optimized Execution Plan ==
> >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称,
> 手机, 座机], upsertMaterialize=[true])
> >+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID,
> 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS
> VARCHAR(2147483647)) AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255))
> AS 座机])
> >   +- TableSourceScan(table=[[default_catalog, default_database, 电话]],
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad,
> 63fd660536521f81a2cfabae])
> >
> >
> >
> >在 2023-03-05 15:37:53,"Jane Chan" <qi...@gmail.com> 写道:
> >>Hi,
> >>
> >>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在
> 1.16.1
> >>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
> >>打印出来看看.
> >>
> >>[1]
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
> >>
> >>祝好!
> >>Jane
> >>
> >>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪 <ja...@yeah.net> wrote:
> >>
> >>> hi 你好
> >>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2023-03-02 11:52:41,"Jane Chan" <qi...@gmail.com> 写道:
> >>> >Hi,
> >>> >
> >>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本,
> 这个
> >>> >query 在 1.16.2 上验证没有问题
> >>> >
> >>> >[1]
> >>> >
> >>>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
> >>> >
> >>> >Best,
> >>> >Jane
> >>> >
> >>> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <ja...@yeah.net> wrote:
> >>> >
> >>> >> flink ,kafka连接 jdbc连接版本都是1.15.2的
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >> 在 2023-03-01 18:14:35,"陈佳豪" <ja...@yeah.net> 写道:
> >>> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
> >>> >> >String kafka = "CREATE TABLE `电话` (    `rowid`
> >>> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
> >>> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
> >>> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
> >>> >> 'connector' = 'kafka', 'topic' =
> >>> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
> >>> >> 'properties.bootstrap.servers' = '132.232.27.116:9092',
> >>> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json'
> )";
> >>> >> >
> >>> >> >String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称`
> >>> >> STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT
> >>> >> ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' =
> >>> >> 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://
> >>> >> 43.136.128.102:6506/meihua_test',    'username' = 'root',
> >>> 'password' =
> >>> >> '123456',    'table-name' = '电话2'  )";
> >>> >> >
> >>> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
> >>> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机`
> from (
> >>> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
> >>> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae`
> as
> >>> `座机`
> >>> >> from `电话` ) as t_1";
> >>> >> >
> >>> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
> >>> >>
> >>>
>

Re:Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

Posted by 陈佳豪 <ja...@yeah.net>.
刚做了一下测试
目前假定有3行数据需要同步(全量):
| 编号 |
电话
|
座机
|
| 1 |
13113133333
|
123
|
| 2 |
13113133333
|
456
|
| 3 |
13113133333
|
789
|




这个时候我修改第四行数据的两个字段(增量):
| 1


|
电话
|
座机
|
| 1 |
13113133333
|
123
|
| 2 |
13113133333
|
456
|
| 3 |
13113133110
|
888
|
修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确).
然后我继续删除数据3这个时候就不对了,在flink里面有修改两次的缓存数据,所以删除的同时将原来的旧数据插入进了mysql中(操作错误).
上述是我基于flink1.16.1版本进行测试的结果,目前不知道是不是要配置flink还是下游算子具体配置什么也不是清楚。这个问题困扰有3周了,各种测试调整都没有起作用。








在 2023-03-06 10:54:23,"陈佳豪" <ja...@yeah.net> 写道:
>hi 早上好
>我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下
>
>== Abstract Syntax Tree ==
>LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机])
>+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], 名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"])
>   +- LogicalTableScan(table=[[default_catalog, default_database, 电话]])
>
>
>== Optimized Physical Plan ==
>Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机], upsertMaterialize=[true])
>+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS 座机])
>   +- TableSourceScan(table=[[default_catalog, default_database, 电话]], fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 63fd660536521f81a2cfabae])
>
>
>== Optimized Execution Plan ==
>Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机], upsertMaterialize=[true])
>+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS VARCHAR(2147483647)) AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255)) AS 座机])
>   +- TableSourceScan(table=[[default_catalog, default_database, 电话]], fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 63fd660536521f81a2cfabae])
>
>
>
>在 2023-03-05 15:37:53,"Jane Chan" <qi...@gmail.com> 写道:
>>Hi,
>>
>>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1
>>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
>>打印出来看看.
>>
>>[1]
>>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
>>
>>祝好!
>>Jane
>>
>>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪 <ja...@yeah.net> wrote:
>>
>>> hi 你好
>>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2023-03-02 11:52:41,"Jane Chan" <qi...@gmail.com> 写道:
>>> >Hi,
>>> >
>>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
>>> >query 在 1.16.2 上验证没有问题
>>> >
>>> >[1]
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
>>> >
>>> >Best,
>>> >Jane
>>> >
>>> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <ja...@yeah.net> wrote:
>>> >
>>> >> flink ,kafka连接 jdbc连接版本都是1.15.2的
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> 在 2023-03-01 18:14:35,"陈佳豪" <ja...@yeah.net> 写道:
>>> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
>>> >> >String kafka = "CREATE TABLE `电话` (    `rowid`
>>> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
>>> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
>>> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
>>> >> 'connector' = 'kafka', 'topic' =
>>> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
>>> >> 'properties.bootstrap.servers' = '132.232.27.116:9092',
>>> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
>>> >> >
>>> >> >String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称`
>>> >> STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT
>>> >> ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' =
>>> >> 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://
>>> >> 43.136.128.102:6506/meihua_test',    'username' = 'root',
>>> 'password' =
>>> >> '123456',    'table-name' = '电话2'  )";
>>> >> >
>>> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
>>> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
>>> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
>>> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as
>>> `座机`
>>> >> from `电话` ) as t_1";
>>> >> >
>>> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
>>> >>
>>>

Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

Posted by 陈佳豪 <ja...@yeah.net>.
hi 早上好
我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机])
+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], 名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"])
   +- LogicalTableScan(table=[[default_catalog, default_database, 电话]])


== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机], upsertMaterialize=[true])
+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS 座机])
   +- TableSourceScan(table=[[default_catalog, default_database, 电话]], fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 63fd660536521f81a2cfabae])


== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机], upsertMaterialize=[true])
+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS VARCHAR(2147483647)) AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255)) AS 座机])
   +- TableSourceScan(table=[[default_catalog, default_database, 电话]], fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad, 63fd660536521f81a2cfabae])



在 2023-03-05 15:37:53,"Jane Chan" <qi...@gmail.com> 写道:
>Hi,
>
>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1
>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
>打印出来看看.
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
>
>祝好!
>Jane
>
>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪 <ja...@yeah.net> wrote:
>
>> hi 你好
>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-02 11:52:41,"Jane Chan" <qi...@gmail.com> 写道:
>> >Hi,
>> >
>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
>> >query 在 1.16.2 上验证没有问题
>> >
>> >[1]
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
>> >
>> >Best,
>> >Jane
>> >
>> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <ja...@yeah.net> wrote:
>> >
>> >> flink ,kafka连接 jdbc连接版本都是1.15.2的
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2023-03-01 18:14:35,"陈佳豪" <ja...@yeah.net> 写道:
>> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
>> >> >String kafka = "CREATE TABLE `电话` (    `rowid`
>> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
>> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
>> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
>> >> 'connector' = 'kafka', 'topic' =
>> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
>> >> 'properties.bootstrap.servers' = '132.232.27.116:9092',
>> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
>> >> >
>> >> >String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称`
>> >> STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT
>> >> ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' =
>> >> 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://
>> >> 43.136.128.102:6506/meihua_test',    'username' = 'root',
>> 'password' =
>> >> '123456',    'table-name' = '电话2'  )";
>> >> >
>> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
>> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
>> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
>> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as
>> `座机`
>> >> from `电话` ) as t_1";
>> >> >
>> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
>> >>
>>

Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

Posted by Jane Chan <qi...@gmail.com>.
Hi,

抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1
上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
打印出来看看.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/

祝好!
Jane

On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪 <ja...@yeah.net> wrote:

> hi 你好
> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-02 11:52:41,"Jane Chan" <qi...@gmail.com> 写道:
> >Hi,
> >
> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
> >query 在 1.16.2 上验证没有问题
> >
> >[1]
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
> >
> >Best,
> >Jane
> >
> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <ja...@yeah.net> wrote:
> >
> >> flink ,kafka连接 jdbc连接版本都是1.15.2的
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-01 18:14:35,"陈佳豪" <ja...@yeah.net> 写道:
> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
> >> >String kafka = "CREATE TABLE `电话` (    `rowid`
> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
> >> 'connector' = 'kafka', 'topic' =
> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
> >> 'properties.bootstrap.servers' = '132.232.27.116:9092',
> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
> >> >
> >> >String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称`
> >> STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT
> >> ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' =
> >> 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://
> >> 43.136.128.102:6506/meihua_test',    'username' = 'root',
> 'password' =
> >> '123456',    'table-name' = '电话2'  )";
> >> >
> >> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
> >> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
> >> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
> >> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as
> `座机`
> >> from `电话` ) as t_1";
> >> >
> >> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
> >>
>

Re:Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

Posted by 陈佳豪 <ja...@yeah.net>.
hi 你好
目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的

















在 2023-03-02 11:52:41,"Jane Chan" <qi...@gmail.com> 写道:
>Hi,
>
>可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
>query 在 1.16.2 上验证没有问题
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
>
>Best,
>Jane
>
>On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <ja...@yeah.net> wrote:
>
>> flink ,kafka连接 jdbc连接版本都是1.15.2的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-01 18:14:35,"陈佳豪" <ja...@yeah.net> 写道:
>> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
>> >String kafka = "CREATE TABLE `电话` (    `rowid`
>> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
>> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
>> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
>> 'connector' = 'kafka', 'topic' =
>> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
>> 'properties.bootstrap.servers' = '132.232.27.116:9092',
>> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
>> >
>> >String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称`
>> STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT
>> ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' =
>> 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://
>> 43.136.128.102:6506/meihua_test',    'username' = 'root',    'password' =
>> '123456',    'table-name' = '电话2'  )";
>> >
>> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
>> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
>> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
>> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机`
>> from `电话` ) as t_1";
>> >
>> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
>>

Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

Posted by Jane Chan <qi...@gmail.com>.
Hi,

可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
query 在 1.16.2 上验证没有问题

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/

Best,
Jane

On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <ja...@yeah.net> wrote:

> flink ,kafka连接 jdbc连接版本都是1.15.2的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-01 18:14:35,"陈佳豪" <ja...@yeah.net> 写道:
> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
> >String kafka = "CREATE TABLE `电话` (    `rowid`
> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
> 'connector' = 'kafka', 'topic' =
> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
> 'properties.bootstrap.servers' = '132.232.27.116:9092',
> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
> >
> >String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称`
> STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT
> ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' =
> 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://
> 43.136.128.102:6506/meihua_test',    'username' = 'root',    'password' =
> '123456',    'table-name' = '电话2'  )";
> >
> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机`
> from `电话` ) as t_1";
> >
> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
>

Re:使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

Posted by 陈佳豪 <ja...@yeah.net>.
flink ,kafka连接 jdbc连接版本都是1.15.2的

















在 2023-03-01 18:14:35,"陈佳豪" <ja...@yeah.net> 写道:
>问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
>String kafka = "CREATE TABLE `电话` (    `rowid` VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` VARCHAR(2147483647),`63fd660536521f81a2cfabad` VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH ( 'connector' = 'kafka', 'topic' = 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', 'properties.bootstrap.servers' = '132.232.27.116:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
>
>String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称` STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' = 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://43.136.128.102:6506/meihua_test',    'username' = 'root',    'password' = '123456',    'table-name' = '电话2'  )";
>
>String insert = "insert into `电话_1` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机` from `电话` ) as t_1";
>
>操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?