You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/27 09:40:59 UTC

[GitHub] [iceberg] kingeasternsun commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

kingeasternsun commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-1023020791


   > I merged this PR and did some testing. i used flink cdc to consume binlog and wrote to iceberg table, then run some flink sql query(streaming) and compared with mysql original query , then the results were not matched.
   > 
   > After some debug I found 2 issues.
   > 
   > 1. flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message,  it's completely not correct. The relevant codes are as follows:
   > 
   > `
   > 
   > ```
   > //org.apache.iceberg.flink.sink.BaseDeltaTaskWriter
   >  switch (row.getRowKind()) {
   >   case INSERT:
   >   case UPDATE_AFTER:
   >     if (upsert) {
   >       writer.delete(row);
   >     }
   >     writer.write(row);
   >     break;
   > 
   >   case UPDATE_BEFORE:
   >     if (upsert) {
   >       break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
   >     }
   >     writer.delete(row);
   >     break;
   >   case DELETE:
   >     writer.delete(row);
   >     break;
   > 
   >   default:
   >     throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
   > }
   > ```
   > 
   > `
   > 
   > 2. when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files  and no data file(data file is deleted),  flink will ignore equality delete files and miss all  -D .   function org.apache.iceberg.ManifestGroup.planStreamingFiles()   create FileScanTask  only when data file exists, so flink can not process equality delete files.
   > 
   > After fixed above 2 issues the test is passed.
   
   
   
   > BaseDeltaTaskWriter
   
   I think this code will be more clear if written like this
   ```java
     public void write(RowData row) throws IOException {
       RowDataDeltaWriter writer = route(row);
   
       switch (row.getRowKind()) {
         case INSERT:
           if (upsert) {
             writer.delete(row);
           }
           writer.write(row);
           break;
         case UPDATE_AFTER:
           writer.write(row);
           break;
         case UPDATE_BEFORE:
         case DELETE:
           writer.delete(row);
           break;
   
         default:
           throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
       }
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org