You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "casel.chen" <ca...@126.com> on 2021/07/07 14:42:12 UTC

如何将canal json格式数据按操作类型过滤

使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete 操作的数据插入到文件系统,因为要做历史数据存档用。
查了下官网 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata
{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter","weight":"5.18"}],"database":"inventory","es":1589373560000,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"}
CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAP<STRING,INT>METADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAY<STRING>METADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json');
只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp 字段,而拿不到代表操作类型的 type 字段。请问有什么别的办法么?





Re: 如何将canal json格式数据按操作类型过滤

Posted by Zhiwen Sun <pe...@gmail.com>.
先通过 json 或者 raw format 消费原始 canal kafka , 过滤掉 delete 的数据写入到一个新的 kafka
,然后你再基于新的 kafka 建一个 canal-json 的表来落地。

Zhiwen Sun



On Wed, Jul 7, 2021 at 10:51 PM JasonLee <17...@163.com> wrote:

> hi
>
>
> 最后一个字段 type 就是操作的类型, 过滤掉 DELETE 就行了.
>
>
> Best
> JasonLee
> 在2021年7月7日 22:43,casel.chen<ca...@126.com> 写道:
> 使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete
> 操作的数据插入到文件系统,因为要做历史数据存档用。
> 查了下官网
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata
> {"data":[{"id":"111","name":"scooter","description":"Big 2-wheel
> scooter","weight":"5.18"}],"database":"inventory","es":1589373560000,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"}
>
> CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAP<STRING,INT>METADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAY<STRING>METADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','
> properties.group.id
> '='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json');
> 只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp
> 字段,而拿不到代表操作类型的 type 字段。请问有什么别的办法么?
>
>
>
>
>

回复:如何将canal json格式数据按操作类型过滤

Posted by JasonLee <17...@163.com>.
hi


最后一个字段 type 就是操作的类型, 过滤掉 DELETE 就行了.


Best
JasonLee
在2021年7月7日 22:43,casel.chen<ca...@126.com> 写道:
使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete 操作的数据插入到文件系统,因为要做历史数据存档用。
查了下官网 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata
{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter","weight":"5.18"}],"database":"inventory","es":1589373560000,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"}
CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAP<STRING,INT>METADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAY<STRING>METADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json');
只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp 字段,而拿不到代表操作类型的 type 字段。请问有什么别的办法么?





Re:如何将canal json格式数据按操作类型过滤

Posted by 东东 <do...@163.com>.
仿照 CanalJsonFormatFactory,自己实现一个DeserializationFormatFactory就行,在反序列化的时候过滤掉Delete操作就行了。


在 2021-07-07 22:42:12,"casel.chen" <ca...@126.com> 写道: >使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete 操作的数据插入到文件系统,因为要做历史数据存档用。 >查了下官网 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata >{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter","weight":"5.18"}],"database":"inventory","es":1589373560000,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"} >CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAP<STRING,INT>METADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAY<STRING>METADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json'); >只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp 字段,而拿不到代表操作类型的 type 字段。请问有什么别的办法么? > > > >