You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/12/25 03:00:00 UTC

[jira] [Closed] (FLINK-20763) canal format parse update record with null value get wrong result

     [ https://issues.apache.org/jira/browse/FLINK-20763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu closed FLINK-20763.
---------------------------
    Resolution: Duplicate

> canal format parse update record with null value get wrong result
> -----------------------------------------------------------------
>
>                 Key: FLINK-20763
>                 URL: https://issues.apache.org/jira/browse/FLINK-20763
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.11.2
>            Reporter: WangRuoQi
>            Priority: Major
>         Attachments: canal_format.patch
>
>
> When i use canal format to consume mysql binlog like this:
> {code:java}
> select ymd,count(order_no),count(*) from order_table where status>=3 group by ymd;{code}
> I get result like this:
> {code:java}
> (20201212,10,10)
> ..
> (20201212,20,24)
> ..
> (20201212,100,130)
> ..{code}
> I am ensure than when status>=3, every record has a valid order_no, and i got a result with diferrent count(order_no) and count(*).
> I found this on debugging.
> {code:java}
> insert into order_table(id,ymd,order_no,status) values(1,20201212,null,1);
> -- +I(1,20201212,null,1)
> update table order_table set order_no=123,status=3 where id=1;
> -- -U(1,20201212,123,1)  # err, correct log may be -U(1,20201212,null,1)
> -- +U(1,20201212,123,3){code}
> So i notice that the canal format meet bug when parse update record.
>  
> The source code logic is
> {code:java}
> // org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema
> } else if (OP_UPDATE.equals(type)) {
>    // "data" field is an array of row, contains new rows
>    ArrayData data = row.getArray(0);
>    // "old" field is an array of row, contains old values
>    ArrayData old = row.getArray(1);
>    for (int i = 0; i < data.size(); i++) {
>       // the underlying JSON deserialization schema always produce GenericRowData.
>       GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
>       GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
>       for (int f = 0; f < fieldCount; f++) {
>          if (before.isNullAt(f)) {
>             // not null fields in "old" (before) means the fields are changed
>             // null/empty fields in "old" (before) means the fields are not changed
>             // so we just copy the not changed fields into before
>             before.setField(f, after.getField(f));
>          }
>       }
>       before.setRowKind(RowKind.UPDATE_BEFORE);
>       after.setRowKind(RowKind.UPDATE_AFTER);
>       out.collect(before);
>       out.collect(after);
> {code}
> When the old field has null value, it will be overwrite by the new record value. That lead the aggregation to a wrong result.
>  
> I tried to fix this bug with following logic.
> For each field. Use old value when old row has this field whether it is null or nut, Use new value by default.
> I hope this bug will be fixed on the future version.
> [^canal_format.patch]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)