You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "amenhub@163.com" <am...@163.com> on 2020/07/24 10:18:23 UTC

回复: Re: flink 1.11 cdc相关问题

多谢!已关注~


Best


amenhub@163.com
 
发件人: Leonard Xu
发送时间: 2020-07-24 16:20
收件人: user-zh
主题: Re: flink 1.11 cdc相关问题
Hi amenhub
 
针对这个问题,我建了个issue来跟踪这个问题[1],
另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息,
DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2]
 
Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18700 <https://issues.apache.org/jira/browse/FLINK-18700>
[2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html <https://debezium.io/documentation/reference/1.2/connectors/postgresql.html>
 
> 在 2020年7月23日,09:14,amenhub@163.com 写道:
>
> 感谢二位大佬@Leonard, @Jark的解答!
>
>
>
> amenhub@163.com
>
> 发件人: Jark Wu
> 发送时间: 2020-07-22 23:56
> 收件人: user-zh
> 主题: Re: flink 1.11 cdc相关问题
> Hi,
>
> 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
> after 字段就不是全的。
> 这个问题会在后面地版本中解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 21:07, Leonard Xu <xb...@gmail.com> wrote:
>
>> Hello,
>>
>> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
>> 看起来是你的数据问题,一条 update 的changelog, before 为null,
>> 这是不合理的,没有before的数据,是无法处理after的数据的。
>> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>>
>> 祝好
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>>
>>
>> {
>>    "payload": {
>>        "before": null,
>>        "after": {
>>            "id": 2,
>>            "name": "liushimin",
>>            "age": "24",
>>            "sex": "man",
>>            "phone": "155555555"
>>        },
>>        "source": {
>>            "version": "1.2.0.Final",
>>            "connector": "postgresql",
>>            "name": "postgres",
>>            "ts_ms": 1595409754151,
>>            "snapshot": "false",
>>            "db": "postgres",
>>            "schema": "public",
>>            "table": "person",
>>            "txId": 569,
>>            "lsn": 23632344,
>>            "xmin": null
>>        },
>>        "op": "u",
>>        "ts_ms": 1595409754270,
>>        "transaction": null
>>    }
>> }
>>
>>> 在 2020年7月22日,17:34,amenhub@163.com 写道:
>>>
>>>
>> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>>
>>