You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/12/25 03:00:00 UTC
[jira] [Closed] (FLINK-20763) canal format parse update record with
null value get wrong result
[ https://issues.apache.org/jira/browse/FLINK-20763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu closed FLINK-20763.
---------------------------
Resolution: Duplicate
> canal format parse update record with null value get wrong result
> -----------------------------------------------------------------
>
> Key: FLINK-20763
> URL: https://issues.apache.org/jira/browse/FLINK-20763
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.11.2
> Reporter: WangRuoQi
> Priority: Major
> Attachments: canal_format.patch
>
>
> When i use canal format to consume mysql binlog like this:
> {code:java}
> select ymd,count(order_no),count(*) from order_table where status>=3 group by ymd;{code}
> I get result like this:
> {code:java}
> (20201212,10,10)
> ..
> (20201212,20,24)
> ..
> (20201212,100,130)
> ..{code}
> I am ensure than when status>=3, every record has a valid order_no, and i got a result with diferrent count(order_no) and count(*).
> I found this on debugging.
> {code:java}
> insert into order_table(id,ymd,order_no,status) values(1,20201212,null,1);
> -- +I(1,20201212,null,1)
> update table order_table set order_no=123,status=3 where id=1;
> -- -U(1,20201212,123,1) # err, correct log may be -U(1,20201212,null,1)
> -- +U(1,20201212,123,3){code}
> So i notice that the canal format meet bug when parse update record.
>
> The source code logic is
> {code:java}
> // org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema
> } else if (OP_UPDATE.equals(type)) {
> // "data" field is an array of row, contains new rows
> ArrayData data = row.getArray(0);
> // "old" field is an array of row, contains old values
> ArrayData old = row.getArray(1);
> for (int i = 0; i < data.size(); i++) {
> // the underlying JSON deserialization schema always produce GenericRowData.
> GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
> GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
> for (int f = 0; f < fieldCount; f++) {
> if (before.isNullAt(f)) {
> // not null fields in "old" (before) means the fields are changed
> // null/empty fields in "old" (before) means the fields are not changed
> // so we just copy the not changed fields into before
> before.setField(f, after.getField(f));
> }
> }
> before.setRowKind(RowKind.UPDATE_BEFORE);
> after.setRowKind(RowKind.UPDATE_AFTER);
> out.collect(before);
> out.collect(after);
> {code}
> When the old field has null value, it will be overwrite by the new record value. That lead the aggregation to a wrong result.
>
> I tried to fix this bug with following logic.
> For each field. Use old value when old row has this field whether it is null or nut, Use new value by default.
> I hope this bug will be fixed on the future version.
> [^canal_format.patch]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)