You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "casel.chen" <ca...@126.com> on 2021/07/10 06:18:37 UTC

flink sql cdc数据按主键keyby入库问题

场景:mysql数据实时同步到mongodb.  上游mysql binlog日志发到一个kafka topic, 不保证同一个主键的记录发到相同的partition,为了保证下游sink mongodb同一主键的所有记录按序保存,所以需要按主键keyby。然后下游再批量写入mongodb。
问题:flink sql有办法解决上述问题?如果可以的话,要怎么写?


create table person_source (
  id BIGINT PRIMARY KEY NOT FORCED,
  name STRING,
  age BIGINT
) with (
   'connector' = 'kafka',
   ......
   'format' = 'canal-json'
);


create view person_view as 
select id, ??? from person_source group by id;


create table person_sink (
   id BIGINT PRIMARY KEY NOT FORCED,
  name STRING,
  age BIGINT
) with (
   'connector' = 'mongodb',
   ......
   'format' = 'json'
);


insert into person_sink select * from person_view;