You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "WangRuoQi (Jira)" <ji...@apache.org> on 2020/12/24 16:14:00 UTC

[jira] [Updated] (FLINK-20763) canal format parse update record with null value get wrong result

     [ https://issues.apache.org/jira/browse/FLINK-20763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

WangRuoQi updated FLINK-20763:
------------------------------
    Description: 
When i use canal format to consume mysql binlog like this:
{code:java}
select ymd,count(order_no),count(*) from order_table where status>=3 group by ymd;{code}
I get result like this:
{code:java}
(20201212,10,10)
..
(20201212,20,24)
..
(20201212,100,130)
..{code}
I am ensure than when status>=3, every record has a valid order_no, and i got a result with dirrent count(order_no) and count(*).

I found this on debugging.
{code:java}
insert into order_table(ymd,order_no,status) values(20201212,null,1);
-- +I(20201212,null,1)
update table order_table set order_no=123,status=3 where id=1;
-- -U(20201212,123,1)
-- +U(20201212,123,3){code}
So i notice that the canal format meet bug when parse update record.

 

The source code logic is
{code:java}
} else if (OP_UPDATE.equals(type)) {
   // "data" field is an array of row, contains new rows
   ArrayData data = row.getArray(0);
   // "old" field is an array of row, contains old values
   ArrayData old = row.getArray(1);
   for (int i = 0; i < data.size(); i++) {
      // the underlying JSON deserialization schema always produce GenericRowData.
      GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
      GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
      for (int f = 0; f < fieldCount; f++) {
         if (before.isNullAt(f)) {
            // not null fields in "old" (before) means the fields are changed
            // null/empty fields in "old" (before) means the fields are not changed
            // so we just copy the not changed fields into before
            before.setField(f, after.getField(f));
         }
      }
      before.setRowKind(RowKind.UPDATE_BEFORE);
      after.setRowKind(RowKind.UPDATE_AFTER);
      out.collect(before);
      out.collect(after);
{code}
When the old field has null value, it will be overwrite by the new record value. That lead the aggregation to a wrong result.

 

I tried to fix this bug with following logic.

For each field. Use old value when old row has this field whether it is null or nut, Use new value by default.

I hope this bug will be fixed on the future version.

[^canal_format.patch]

 

  was:
When i use canal format to consume mysql binlog like this:
{code:java}
select ymd,count(order_no),count(*) from order_table where status>=3 group by ymd;{code}
I get result like this:
{code:java}
(20201212,10,10)
..
(20201212,20,24)
..
(20201212,100,130)
..{code}
I am ensure than when status>=3, every record has a valid order no, and i got a result with dirrent count(order_no) and count(*).

I found this on debugging.
{code:java}
insert into order_table(ymd,order_no,status) values(20201212,null,1);
-- +I(20201212,null,1)
update table order_table set order_no=123,status=3 where id=1;
-- -U(20201212,123,1)
-- +U(20201212,123,3){code}
So i notice that the canal format meet bug when parse update record.

 

The source code logic is
{code:java}
} else if (OP_UPDATE.equals(type)) {
   // "data" field is an array of row, contains new rows
   ArrayData data = row.getArray(0);
   // "old" field is an array of row, contains old values
   ArrayData old = row.getArray(1);
   for (int i = 0; i < data.size(); i++) {
      // the underlying JSON deserialization schema always produce GenericRowData.
      GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
      GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
      for (int f = 0; f < fieldCount; f++) {
         if (before.isNullAt(f)) {
            // not null fields in "old" (before) means the fields are changed
            // null/empty fields in "old" (before) means the fields are not changed
            // so we just copy the not changed fields into before
            before.setField(f, after.getField(f));
         }
      }
      before.setRowKind(RowKind.UPDATE_BEFORE);
      after.setRowKind(RowKind.UPDATE_AFTER);
      out.collect(before);
      out.collect(after);
{code}
When the old field has null value, it will be overwrite by the new record value. That lead the aggregation to a wrong result.

 

I tried to fix this bug with following logic.

For each field. Use old value when old row has this field whether it is null or nut, Use new value by default.

I hope this bug will be fixed on the future version.

[^canal_format.patch]

 


> canal format parse update record with null value get wrong result
> -----------------------------------------------------------------
>
>                 Key: FLINK-20763
>                 URL: https://issues.apache.org/jira/browse/FLINK-20763
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.11.2
>            Reporter: WangRuoQi
>            Priority: Major
>         Attachments: canal_format.patch
>
>
> When i use canal format to consume mysql binlog like this:
> {code:java}
> select ymd,count(order_no),count(*) from order_table where status>=3 group by ymd;{code}
> I get result like this:
> {code:java}
> (20201212,10,10)
> ..
> (20201212,20,24)
> ..
> (20201212,100,130)
> ..{code}
> I am ensure than when status>=3, every record has a valid order_no, and i got a result with dirrent count(order_no) and count(*).
> I found this on debugging.
> {code:java}
> insert into order_table(ymd,order_no,status) values(20201212,null,1);
> -- +I(20201212,null,1)
> update table order_table set order_no=123,status=3 where id=1;
> -- -U(20201212,123,1)
> -- +U(20201212,123,3){code}
> So i notice that the canal format meet bug when parse update record.
>  
> The source code logic is
> {code:java}
> } else if (OP_UPDATE.equals(type)) {
>    // "data" field is an array of row, contains new rows
>    ArrayData data = row.getArray(0);
>    // "old" field is an array of row, contains old values
>    ArrayData old = row.getArray(1);
>    for (int i = 0; i < data.size(); i++) {
>       // the underlying JSON deserialization schema always produce GenericRowData.
>       GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
>       GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
>       for (int f = 0; f < fieldCount; f++) {
>          if (before.isNullAt(f)) {
>             // not null fields in "old" (before) means the fields are changed
>             // null/empty fields in "old" (before) means the fields are not changed
>             // so we just copy the not changed fields into before
>             before.setField(f, after.getField(f));
>          }
>       }
>       before.setRowKind(RowKind.UPDATE_BEFORE);
>       after.setRowKind(RowKind.UPDATE_AFTER);
>       out.collect(before);
>       out.collect(after);
> {code}
> When the old field has null value, it will be overwrite by the new record value. That lead the aggregation to a wrong result.
>  
> I tried to fix this bug with following logic.
> For each field. Use old value when old row has this field whether it is null or nut, Use new value by default.
> I hope this bug will be fixed on the future version.
> [^canal_format.patch]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)