You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "casel.chen" <ca...@126.com> on 2022/08/21 02:49:29 UTC

flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal json格式输出呢?有没有例子或关键代码展示?谢谢!

Re:Re:Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

Posted by Xuyang <xy...@163.com>.
Hi, 文档[1] 记录的是类似mysql (cancel json) -> kafka -> flink -> other db 的行为,主要还是侧重于flink 读canal format。


中间的转换需要自己实现下,可以在udf中通过open方法连一下mongodb拿一下,因为目前udf是感知不到catalog。





[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format




--

    Best!
    Xuyang





在 2022-08-23 08:55:44,"casel.chen" <ca...@126.com> 写道:
>数据流图是 mongodb --> flink cdc --> kafka (canal json)
>看了flink cdc解析出的mongodb oplog转成json字符串是下面这样子[1],而下游需要从kafka消费canal json格式的消息,中间的格式转换得自己实现是么?
>但mongodb oplog是不带schema信息的,而且没有canal中的old字段信息,这块信息要怎么转换呢?
>
>
>另,我用flink sql如下往kafka发送canal json格式数据是不完整的[2],并不是一个标准的canal json数据[3]。这是已知的issue么?
>
>
>CREATETABLE mongo_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb-cdc','hosts'='localhost:27017','username'='mongouser','password'='mongopw','database'='mgdb','collection'='customers');
>CREATETABLE kafka_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='upsert-kafka','topic'='customers','properties.bootstrap.servers'='localhost:9092',
>'format'='canal-json');
>INSERT INTO kafka_customers SELECT * FROM mongo_customers;
>
>
>[1]
>{
>"_id": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"copyingData\": true}",
>"operationType": "insert",
>"fullDocument": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"_class\": \"com.huifu.uqp.dal.mongo.model.TopTransOrder\", \"reqSeqId\": \"123\", \"ordId\": \"1440509760709632000\", \"outTransId\": \"123\", \"merOrdId\": \"123\", \"hfSeqId\": \"123\", \"partyOrderId\": \"123\", \"bankSeqId\": \"123\", \"orgOrdId\": \"123\", \"orgTermOrdId\": \"123\", \"orgHuifuSeqId\": \"123\", \"transDate\": \"20210913\", \"productId\": \"app8\", \"serviceId\": \"6767639\", \"topAgentId\": \"123\", \"belongAgentId\": \"123\", \"chainsId\": \"123\", \"huifuId\": \"666684552350\", \"transMajorCategory\": \"123\", \"consoleActualPayChannel\": \"123\", \"consolePayType\": \"123\", \"consolePreAuthFlag\": \"123\", \"consoleSubsidyFlag\": \"123\", \"consoleDcType\": \"123\", \"consoleIsFq\": \"123\", \"consoleAcctDivFlag\": \"123\", \"actualPayChannel\": \"123\", \"payChannel\": \"123\", \"transType\": \"123\", \"payType\": \"123\", \"dcType\": \"123\", \"isAcctDiv\": \"123\", \"isDelayAcct\": \"123\", \"creditType\": \"123\", \"devsId\": \"123\", \"ordAmt\": 123.32, \"feeAmt\": 123.0, \"actOrdAmt\": 123.0, \"actualRefAmt\": 123.0, \"refAmt\": 123.0, \"refFeeAmt\": 123.0, \"subsidyAmt\": 123.0, \"subsidyRefAmt\": 123.0, \"payCardId\": \"123\", \"feeRecType\": \"123\", \"feeFlag\": \"123\", \"transStat\": \"S\", \"createTime\": {\"$date\": 1632279264987}, \"transFinishTime\": \"123\", \"kafkaTime\": \"123\", \"tableName\": \"123\", \"offset\": \"123\", \"recordVersion\": \"123\", \"sign\": \"123\"}",
>"source": {
>"ts_ms": 0,
>"snapshot": "true"
>},
>"ns": {
>"db": "amp_test",
>"coll": "TopTransOrder"
>},
>"to": null,
>"documentKey": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}}",
>"updateDescription": null,
>"clusterTime": null,
>"txnNumber": null,
>"lsid": null
>}
>
>
>
>
>[2]
>{"data":[{"_id":"614a9b3769736f5fcc492613","id":null,"reqSeqId":"123","ordId":"1440510124339011584","outTransId":"123","merOrdId":"123","hfSeqId":"123","partyOrderId":"123","bankSeqId":"123","orgOrdId":"123","orgTermOrdId":"123","orgHuifuSeqId":"123","transDate":"20210913","productId":"app3","serviceId":"6767679","topAgentId":"123","belongAgentId":"123","chainsId":"123","huifuId":"66668455531","transMajorCategory":"123","consoleActualPayChannel":"123","consolePayType":"123","consolePreAuthFlag":"123","consoleSubsidyFlag":"123","consoleDcType":"123","consoleAcctDivFlag":"123","actualPayChannel":"123","payChannel":"123","transType":"123","payType":"123","dcType":"123","isAcctDiv":"123","isDelayAcct":"123","creditType":"123","devsId":"123","ordAmt":123.32,"feeAmt":123,"actOrdAmt":123,"actualRefAmt":123,"refAmt":123,"refFeeAmt":123,"subsidyAmt":123,"subsidyRefAmt":123,"payCardId":"123","feeRecType":"123","feeFlag":"123","transStat":"S","transFinishTime":"123","tableName":"123","offset":"123","recordVersion":"123","sign":"123","synModifyTime":null,"merName":null,"bankName":null,"bankRespDesc":null,"bagentId":null,"bankId":null,"cashRespDesc":null,"reqDate":null,"accSplitBunch":null,"acctId":null,"fqFeeAmt":null,"payCardIdEnc":null,"goodsDesc":null,"remark":null,"synTtlDate":null,"outOrdId":null,"devType":null,"feeHuifuId":null,"feeAcctId":null,"orgTransDate":null,"orgOrdAmt":null,"orgCreateTime":null,"userType":null,"userId":null,"userIdExt":null,"settleAmt":null,"refCnt":null,"consoleCountSum":null,"topConsolePayType":null,"orgMerOrdId":null,"feeAllowanceFlag":null,"correctStat":null,"addedOrgFeeAmt":null,"discountFeeAmt":null,"acctFinishTime":null,"pospSeqId":null,"outOrderId":null,"cashTransId":null,"orgPayType":null,"orgPayChannel":null,"branch1HuifuId":null,"branch2HuifuId":null,"branch3HuifuId":null,"branch4HuifuId":null,"branch5HuifuId":null,"branchHuifuId":null,"level":null,"branchChannelId":null,"orgFeeAmt":null,"orgConsoleIsFq":null,"orgCreditType":null,"fqMerDiscountFlag":null,"payScene":null,"labels":null,"orgTransType":null,"orgFeeRecType":null,"orgFeeFlag":null,"orgDiscountFeeAmt":null,"merOperId":null,"operType":null,"batchId":null,"authNo":null,"refNum":null,"bankMerId":null,"bankMerName":null,"posMerId":null,"posMerName":null,"acqrInstId":null,"doubleExempt":null,"pnrDevId":null,"posTermId":null,"realPayType":null,"channelFinishTime":null,"transRefundBankId":null,"transRefundBankName":null,"orgRealPayType":null,"orgDevsId":null,"merPriv":null,"transRefundOutOrdId":null,"orgHfSeqId":null,"synMode":null,"cloudPay":null,"terminalReqDate":null,"terminalPayChannel":null,"huifuFstOrg":null,"huifuSecOrg":null,"huifuThdOrg":null,"huifuForOrg":null,"huifuSales":null,"partnerBd":null,"organizationId":null,"upperOrgId":null,"merOrg":null,"partnerInnerFstOrg":null,"partnerInnerSecOrg":null,"partnerInnerThdOrg":null,"partnerFstOrg":null,"partnerSecOrg":null,"partnerThdOrg":null,"collectMerFstOrg":null,"collectMerSecOrg":null,"collectMerThdOrg":null,"collectMerForOrg":null,"collectMerFivOrg":null,"collectMerSixOrg":null,"fullPath":null}],"type":"INSERT"}
>
>
>
>
>[3]
>https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format
>
>
>
>
>
>在 2022-08-22 22:57:04,"Xuyang" <xy...@163.com> 写道:
>>Hi, 请问你的需求是 “debezium数据”-&gt; flink -&gt;“canal ”么? 如果是这样的话,可以用UDF[1]来尝试下。<br/><br/>[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
>>在 2022-08-21 10:49:29,"casel.chen" <ca...@126.com> 写道:
>>>flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
>>>flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal json格式输出呢?有没有例子或关键代码展示?谢谢!

Re:Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

Posted by "casel.chen" <ca...@126.com>.
数据流图是 mongodb --> flink cdc --> kafka (canal json)
看了flink cdc解析出的mongodb oplog转成json字符串是下面这样子[1],而下游需要从kafka消费canal json格式的消息,中间的格式转换得自己实现是么?
但mongodb oplog是不带schema信息的,而且没有canal中的old字段信息,这块信息要怎么转换呢?


另,我用flink sql如下往kafka发送canal json格式数据是不完整的[2],并不是一个标准的canal json数据[3]。这是已知的issue么?


CREATETABLE mongo_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb-cdc','hosts'='localhost:27017','username'='mongouser','password'='mongopw','database'='mgdb','collection'='customers');
CREATETABLE kafka_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='upsert-kafka','topic'='customers','properties.bootstrap.servers'='localhost:9092',
'format'='canal-json');
INSERT INTO kafka_customers SELECT * FROM mongo_customers;


[1]
{
"_id": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"copyingData\": true}",
"operationType": "insert",
"fullDocument": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"_class\": \"com.huifu.uqp.dal.mongo.model.TopTransOrder\", \"reqSeqId\": \"123\", \"ordId\": \"1440509760709632000\", \"outTransId\": \"123\", \"merOrdId\": \"123\", \"hfSeqId\": \"123\", \"partyOrderId\": \"123\", \"bankSeqId\": \"123\", \"orgOrdId\": \"123\", \"orgTermOrdId\": \"123\", \"orgHuifuSeqId\": \"123\", \"transDate\": \"20210913\", \"productId\": \"app8\", \"serviceId\": \"6767639\", \"topAgentId\": \"123\", \"belongAgentId\": \"123\", \"chainsId\": \"123\", \"huifuId\": \"666684552350\", \"transMajorCategory\": \"123\", \"consoleActualPayChannel\": \"123\", \"consolePayType\": \"123\", \"consolePreAuthFlag\": \"123\", \"consoleSubsidyFlag\": \"123\", \"consoleDcType\": \"123\", \"consoleIsFq\": \"123\", \"consoleAcctDivFlag\": \"123\", \"actualPayChannel\": \"123\", \"payChannel\": \"123\", \"transType\": \"123\", \"payType\": \"123\", \"dcType\": \"123\", \"isAcctDiv\": \"123\", \"isDelayAcct\": \"123\", \"creditType\": \"123\", \"devsId\": \"123\", \"ordAmt\": 123.32, \"feeAmt\": 123.0, \"actOrdAmt\": 123.0, \"actualRefAmt\": 123.0, \"refAmt\": 123.0, \"refFeeAmt\": 123.0, \"subsidyAmt\": 123.0, \"subsidyRefAmt\": 123.0, \"payCardId\": \"123\", \"feeRecType\": \"123\", \"feeFlag\": \"123\", \"transStat\": \"S\", \"createTime\": {\"$date\": 1632279264987}, \"transFinishTime\": \"123\", \"kafkaTime\": \"123\", \"tableName\": \"123\", \"offset\": \"123\", \"recordVersion\": \"123\", \"sign\": \"123\"}",
"source": {
"ts_ms": 0,
"snapshot": "true"
},
"ns": {
"db": "amp_test",
"coll": "TopTransOrder"
},
"to": null,
"documentKey": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}}",
"updateDescription": null,
"clusterTime": null,
"txnNumber": null,
"lsid": null
}




[2]
{"data":[{"_id":"614a9b3769736f5fcc492613","id":null,"reqSeqId":"123","ordId":"1440510124339011584","outTransId":"123","merOrdId":"123","hfSeqId":"123","partyOrderId":"123","bankSeqId":"123","orgOrdId":"123","orgTermOrdId":"123","orgHuifuSeqId":"123","transDate":"20210913","productId":"app3","serviceId":"6767679","topAgentId":"123","belongAgentId":"123","chainsId":"123","huifuId":"66668455531","transMajorCategory":"123","consoleActualPayChannel":"123","consolePayType":"123","consolePreAuthFlag":"123","consoleSubsidyFlag":"123","consoleDcType":"123","consoleAcctDivFlag":"123","actualPayChannel":"123","payChannel":"123","transType":"123","payType":"123","dcType":"123","isAcctDiv":"123","isDelayAcct":"123","creditType":"123","devsId":"123","ordAmt":123.32,"feeAmt":123,"actOrdAmt":123,"actualRefAmt":123,"refAmt":123,"refFeeAmt":123,"subsidyAmt":123,"subsidyRefAmt":123,"payCardId":"123","feeRecType":"123","feeFlag":"123","transStat":"S","transFinishTime":"123","tableName":"123","offset":"123","recordVersion":"123","sign":"123","synModifyTime":null,"merName":null,"bankName":null,"bankRespDesc":null,"bagentId":null,"bankId":null,"cashRespDesc":null,"reqDate":null,"accSplitBunch":null,"acctId":null,"fqFeeAmt":null,"payCardIdEnc":null,"goodsDesc":null,"remark":null,"synTtlDate":null,"outOrdId":null,"devType":null,"feeHuifuId":null,"feeAcctId":null,"orgTransDate":null,"orgOrdAmt":null,"orgCreateTime":null,"userType":null,"userId":null,"userIdExt":null,"settleAmt":null,"refCnt":null,"consoleCountSum":null,"topConsolePayType":null,"orgMerOrdId":null,"feeAllowanceFlag":null,"correctStat":null,"addedOrgFeeAmt":null,"discountFeeAmt":null,"acctFinishTime":null,"pospSeqId":null,"outOrderId":null,"cashTransId":null,"orgPayType":null,"orgPayChannel":null,"branch1HuifuId":null,"branch2HuifuId":null,"branch3HuifuId":null,"branch4HuifuId":null,"branch5HuifuId":null,"branchHuifuId":null,"level":null,"branchChannelId":null,"orgFeeAmt":null,"orgConsoleIsFq":null,"orgCreditType":null,"fqMerDiscountFlag":null,"payScene":null,"labels":null,"orgTransType":null,"orgFeeRecType":null,"orgFeeFlag":null,"orgDiscountFeeAmt":null,"merOperId":null,"operType":null,"batchId":null,"authNo":null,"refNum":null,"bankMerId":null,"bankMerName":null,"posMerId":null,"posMerName":null,"acqrInstId":null,"doubleExempt":null,"pnrDevId":null,"posTermId":null,"realPayType":null,"channelFinishTime":null,"transRefundBankId":null,"transRefundBankName":null,"orgRealPayType":null,"orgDevsId":null,"merPriv":null,"transRefundOutOrdId":null,"orgHfSeqId":null,"synMode":null,"cloudPay":null,"terminalReqDate":null,"terminalPayChannel":null,"huifuFstOrg":null,"huifuSecOrg":null,"huifuThdOrg":null,"huifuForOrg":null,"huifuSales":null,"partnerBd":null,"organizationId":null,"upperOrgId":null,"merOrg":null,"partnerInnerFstOrg":null,"partnerInnerSecOrg":null,"partnerInnerThdOrg":null,"partnerFstOrg":null,"partnerSecOrg":null,"partnerThdOrg":null,"collectMerFstOrg":null,"collectMerSecOrg":null,"collectMerThdOrg":null,"collectMerForOrg":null,"collectMerFivOrg":null,"collectMerSixOrg":null,"fullPath":null}],"type":"INSERT"}




[3]
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format





在 2022-08-22 22:57:04,"Xuyang" <xy...@163.com> 写道:
>Hi, 请问你的需求是 “debezium数据”-&gt; flink -&gt;“canal ”么? 如果是这样的话,可以用UDF[1]来尝试下。<br/><br/>[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
>在 2022-08-21 10:49:29,"casel.chen" <ca...@126.com> 写道:
>>flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
>>flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal json格式输出呢?有没有例子或关键代码展示?谢谢!

Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

Posted by Xuyang <xy...@163.com>.
Hi, 请问你的需求是 “debezium数据”-&gt; flink -&gt;“canal ”么? 如果是这样的话,可以用UDF[1]来尝试下。<br/><br/>[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
在 2022-08-21 10:49:29,"casel.chen" <ca...@126.com> 写道:
>flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
>flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal json格式输出呢?有没有例子或关键代码展示?谢谢!