You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "casel.chen" <ca...@126.com> on 2021/06/08 06:05:17 UTC

flink sql cdc数据同步至mysql

flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?

Re:Re: flink sql cdc数据同步至mysql

Posted by "casel.chen" <ca...@126.com>.
请问 flink sql cdc 场景下如何增大下游sink端并行度?
我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。
而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到 目标mysql。是想通过kafka partition增大sink并行度
初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。


以下是作业内容:


-- source
CREATE TABLE mysql_old_order_table
(
order_number BIGINT,
price        DECIMAL,
order_time   TIMESTAMP(3),
    PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'flink-test',
'table-name' = 'old_order'
);

-- sink
CREATE TABLE kafka_order_table
(
order_number BIGINT,
price        DECIMAL,
order_time   TIMESTAMP(3),
    PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'order',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);

-- insert
INSERT INTO kafka_order_table SELECT * FROM mysql_old_order_table;







-- source
CREATE TABLE kafka_order_table
(
order_number BIGINT,
price        DECIMAL,
order_time   TIMESTAMP(3),
    PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'order',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);

-- sink
CREATE TABLE mysql_order_table
(
order_number BIGINT,
price        DECIMAL,
order_time   TIMESTAMP(3),
    PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/flink-test',
'table-name' = 'order',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.max-rows' = '3',
'sink.buffer-flush.interval' = '1s'
);

-- insert
INSERT INTO mysql_order_table SELECT * FROM kafka_order_table;





在 2021-06-08 19:49:40,"Leonard Xu" <xb...@gmail.com> 写道:
>试着回答下这两个问题。
>
>> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
>是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc connector支持多并发读取,下游sink自然就能解决。
>
>
>> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>
>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 
>
>祝好,
>Leonard

Re:Re: flink sql cdc数据同步至mysql

Posted by "casel.chen" <ca...@126.com>.


针对现在flink sql cdc下游并行度无法修改问题,是否可以分两步实现?谢谢!
1. flink sql cdc发到下游kafka,通过 upsert kafka connector,以debezium或canal格式,kafka topic开多个分区
2. 再从kafka消费,通过flink sql同步到最终mysql库














在 2021-06-08 19:49:40,"Leonard Xu" <xb...@gmail.com> 写道:
>试着回答下这两个问题。
>
>> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
>是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc connector支持多并发读取,下游sink自然就能解决。
>
>
>> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>
>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 
>
>祝好,
>Leonard

Re: flink sql cdc数据同步至mysql

Posted by Leonard Xu <xb...@gmail.com>.
试着回答下这两个问题。

> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc connector支持多并发读取,下游sink自然就能解决。


> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?

这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 

祝好,
Leonard

Re:flink sql cdc数据同步至mysql

Posted by "casel.chen" <ca...@126.com>.
即使下游sink能加大并行度,也不能确保上游同一个PK记录会流入到同一个task,也就无法保证操作同一条记录的顺序能正确replay,不是么?

















在 2021-06-11 19:30:39,"东东" <do...@163.com> 写道:
>
>
>
>1、升级到1.13
>2、能不能追上要看写入量到底有多大,以及下游的处理能力啊,就是mysql自己的主从复制也不一定能确保追上,实践就知道了。
>3、可以设置一下default.parallism试试,如果发现被chain到一起了,可以把operator chain关掉试试。
>
>
>在 2021-06-11 18:57:36,"casel.chen" <ca...@126.com> 写道:
>>我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。
>>上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理?
>>用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2021-06-11 16:32:00,"东东" <do...@163.com> 写道:
>>>
>>>
>>>
>>>他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢?
>>>
>>>
>>>
>>>
>>>另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash shuffle,确保相同pk的记录发到同一个sink task。
>>>
>>>
>>>在 2021-06-11 15:57:29,"casel.chen" <ca...@126.com> 写道:
>>>>引用 Leonard Xu大佬之前的回答:
>>>>
>>>>> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>>>>
>>>>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>>>>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 
>>>>
>>>>说明加 sink.parallelism 是不行的
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>在 2021-06-11 15:44:51,"JasonLee" <17...@163.com> 写道:
>>>>>hi
>>>>>
>>>>>sink 端可以通过 sink.parallelism 进行设置.
>>>>>
>>>>>
>>>>>
>>>>>-----
>>>>>Best Wishes
>>>>>JasonLee
>>>>>--
>>>>>Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re:Re:Re:Re:flink sql cdc数据同步至mysql

Posted by 东东 <do...@163.com>.


1、升级到1.13
2、能不能追上要看写入量到底有多大,以及下游的处理能力啊,就是mysql自己的主从复制也不一定能确保追上,实践就知道了。
3、可以设置一下default.parallism试试,如果发现被chain到一起了,可以把operator chain关掉试试。


在 2021-06-11 18:57:36,"casel.chen" <ca...@126.com> 写道:
>我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。
>上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理?
>用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-06-11 16:32:00,"东东" <do...@163.com> 写道:
>>
>>
>>
>>他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢?
>>
>>
>>
>>
>>另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash shuffle,确保相同pk的记录发到同一个sink task。
>>
>>
>>在 2021-06-11 15:57:29,"casel.chen" <ca...@126.com> 写道:
>>>引用 Leonard Xu大佬之前的回答:
>>>
>>>> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>>>
>>>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>>>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 
>>>
>>>说明加 sink.parallelism 是不行的
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2021-06-11 15:44:51,"JasonLee" <17...@163.com> 写道:
>>>>hi
>>>>
>>>>sink 端可以通过 sink.parallelism 进行设置.
>>>>
>>>>
>>>>
>>>>-----
>>>>Best Wishes
>>>>JasonLee
>>>>--
>>>>Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re:Re:Re:flink sql cdc数据同步至mysql

Posted by "casel.chen" <ca...@126.com>.
我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。
上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理?
用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数

















在 2021-06-11 16:32:00,"东东" <do...@163.com> 写道:
>
>
>
>他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢?
>
>
>
>
>另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash shuffle,确保相同pk的记录发到同一个sink task。
>
>
>在 2021-06-11 15:57:29,"casel.chen" <ca...@126.com> 写道:
>>引用 Leonard Xu大佬之前的回答:
>>
>>> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>>
>>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 
>>
>>说明加 sink.parallelism 是不行的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2021-06-11 15:44:51,"JasonLee" <17...@163.com> 写道:
>>>hi
>>>
>>>sink 端可以通过 sink.parallelism 进行设置.
>>>
>>>
>>>
>>>-----
>>>Best Wishes
>>>JasonLee
>>>--
>>>Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re:Re:flink sql cdc数据同步至mysql

Posted by 东东 <do...@163.com>.


他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢?




另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash shuffle,确保相同pk的记录发到同一个sink task。


在 2021-06-11 15:57:29,"casel.chen" <ca...@126.com> 写道:
>引用 Leonard Xu大佬之前的回答:
>
>> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>
>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 
>
>说明加 sink.parallelism 是不行的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-06-11 15:44:51,"JasonLee" <17...@163.com> 写道:
>>hi
>>
>>sink 端可以通过 sink.parallelism 进行设置.
>>
>>
>>
>>-----
>>Best Wishes
>>JasonLee
>>--
>>Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re:flink sql cdc数据同步至mysql

Posted by "casel.chen" <ca...@126.com>.
引用 Leonard Xu大佬之前的回答:

> flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?

这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 https://issues.apache.org/jira/browse/FLINK-20374 <https://issues.apache.org/jira/browse/FLINK-20374> https://issues.apache.org/jira/browse/FLINK-22901 <https://issues.apache.org/jira/browse/FLINK-22901> 

说明加 sink.parallelism 是不行的














在 2021-06-11 15:44:51,"JasonLee" <17...@163.com> 写道:
>hi
>
>sink 端可以通过 sink.parallelism 进行设置.
>
>
>
>-----
>Best Wishes
>JasonLee
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:Re:flink sql cdc数据同步至mysql

Posted by JasonLee <17...@163.com>.
hi

sink 端可以通过 sink.parallelism 进行设置.



-----
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re:flink sql cdc数据同步至mysql

Posted by "casel.chen" <ca...@126.com>.
单表存量数据的确很大,但业务方不需要同步存量,只需要同步增量就行


flink sql如何修改下游sink端的并行度呢?通过sql hint?

















在 2021-06-11 11:08:58,"东东" <do...@163.com> 写道:
>1、有必要考虑其他方案了,如果是单表存量数据很大,且不说下游sink的问题,单单是snapshot阶段可能耗时过长,如果一旦失败,就只能整体重来(因为此时不能做checkpoint),任务的成功率就很值得怀疑(当然主要还看存量数据到底有多大)。另外,如果能获取全局锁还好,如果无法获取,则会锁表直到存量数据全部拷贝完毕,基本等于业务down掉。
>2、如果只是简单的insert into xxx  select xxx,就不用担心,runtime在遇到上下游并行度不一致时,如果有主键会按照主键hash的。
>
>
>在 2021-06-08 14:05:17,"casel.chen" <ca...@126.com> 写道:
>>flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
>>flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?

Re:flink sql cdc数据同步至mysql

Posted by 东东 <do...@163.com>.
1、有必要考虑其他方案了,如果是单表存量数据很大,且不说下游sink的问题,单单是snapshot阶段可能耗时过长,如果一旦失败,就只能整体重来(因为此时不能做checkpoint),任务的成功率就很值得怀疑(当然主要还看存量数据到底有多大)。另外,如果能获取全局锁还好,如果无法获取,则会锁表直到存量数据全部拷贝完毕,基本等于业务down掉。
2、如果只是简单的insert into xxx  select xxx,就不用担心,runtime在遇到上下游并行度不一致时,如果有主键会按照主键hash的。


在 2021-06-08 14:05:17,"casel.chen" <ca...@126.com> 写道:
>flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
>flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?