You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "casel.chen" <ca...@126.com> on 2021/07/10 06:18:37 UTC
flink sql cdc数据按主键keyby入库问题
场景:mysql数据实时同步到mongodb. 上游mysql binlog日志发到一个kafka topic, 不保证同一个主键的记录发到相同的partition,为了保证下游sink mongodb同一主键的所有记录按序保存,所以需要按主键keyby。然后下游再批量写入mongodb。
问题:flink sql有办法解决上述问题?如果可以的话,要怎么写?
create table person_source (
id BIGINT PRIMARY KEY NOT FORCED,
name STRING,
age BIGINT
) with (
'connector' = 'kafka',
......
'format' = 'canal-json'
);
create view person_view as
select id, ??? from person_source group by id;
create table person_sink (
id BIGINT PRIMARY KEY NOT FORCED,
name STRING,
age BIGINT
) with (
'connector' = 'mongodb',
......
'format' = 'json'
);
insert into person_sink select * from person_view;