You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/10 02:46:51 UTC

[GitHub] [iceberg] hameizi opened a new pull request #3095: Flink: flink read iceberg upsert data use streaming mode

hameizi opened a new pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095


   Now, flink read iceberg use streaming mode ignore 'overwrite' snapshot, so user can't read the delete data real-time.
   This PR first emit the eqdelete as delete rowdata(-U), and then emit the rowdata(+I) that join adddata and posdelete by function applyPosdelete. And then flink can keep one primary table from read iceberg primary table.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xinbinhuang edited a comment on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
xinbinhuang edited a comment on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-959847806


   Hi @hameizi, thanks for the great work! I'm really interested in this patch to support `overwrite` snapshots. In my company, 90% of the write to iceberg is `overwrite` by Spark, so it makes really hard/impossible to streaming read from these tables in Flink. I wonder if this patch requires Flink 1.13, or it'll also work for 1.12? 
   
   @openinx do you know if this PR can make it to the next release? if so, roughly when will be the next release or does it depends on the progress on the priority 1 items in the roadmap?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xianyouQ commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
xianyouQ commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-1016153897


   I merged this PR and  did some testing. i used flink cdc to consume binlog and wrote to iceberg table, then run some flink sql  query(streaming) and compared with mysql original query , then the results were not matched. 
   
   After some debug I found 2 issues.
   
   1. flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message,  it's completely not correct. The relevant codes are as follows:
   
   `   
   
       //org.apache.iceberg.flink.sink.BaseDeltaTaskWriter
        switch (row.getRowKind()) {
         case INSERT:
         case UPDATE_AFTER:
           if (upsert) {
             writer.delete(row);
           }
           writer.write(row);
           break;
   
         case UPDATE_BEFORE:
           if (upsert) {
             break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
           }
           writer.delete(row);
           break;
         case DELETE:
           writer.delete(row);
           break;
   
         default:
           throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
       }
   `
   
   
   2.  when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files  and no data file(data file is deleted),  flink will ignore equality delete files and miss all  -D .   function org.apache.iceberg.ManifestGroup.planStreamingFiles()   create FileScanTask  only when data file exists, so flink can not process equality delete files.  
   
   
   After fixed above 2 issues  the test is passed.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] MOBIN-F commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
MOBIN-F commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-950717092


   > > > @MOBIN-F It's true, this PR translate update to -D and +I.
   > > 
   > > 
   > > It will query the record(1,qq). but this record is from before the update, Is that right?
   > 
   > If you don't query op of data, you will get only one result (1,aa).
   
   Sorry, I don't understand how to get only one result(1,aa). Could you tell me how to do that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-966879680


   > Thanks ,After testing, the PR is problematic, there will be duplicate data in batch read, not suitable for Upsert stream and join scenario also have problems,
   > I'm trying to implement this feature according to the design documentation
   
   This PR is not relate to bath mode. And can you descript the detail of Upsert and join problem?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xianyouQ commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
xianyouQ commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-1068825479


   Another issue, After  run rewrite data file action and expire snapshot action(retain one Last snapshot), and then started a new flink streaming job, it did not read the old data. I checked metadata files , there was only one snapshot and all manifest file had a smaller sequenceNumber. what should i do if i want to read all data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xinbinhuang commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-959847806


   Hi @hameizi, thanks for the great work! I'm really interested in this patch to support `overwrite` snapshots because 90% of the write to iceberg in my current company is `overwrite` so it makes really hard/impossible to streaming read from these tables in Flink. I wonder if this patch requires Flink 1.13, or it'll also work for 1.12? 
   
   @openinx do you know if this PR can make it to the next release? if so, roughly when will be the next release or does it depends on the progress on the priority 1 items in the roadmap?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] MOBIN-F edited a comment on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
MOBIN-F edited a comment on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-950653689


    @hameizi
   I merged this PR and did some testing and when I update a record, it actually inserts a new record
   
   Flink SQL> CREATE TABLE iceberg_table_upsert8(
   >     id   BIGINT,
   >     data STRING,
   >     PRIMARY KEY (id) NOT ENFORCED
   > ) WITH (
   >      'connector'='iceberg',
   >      'format-version' = '2',
   >      'catalog-name'='hive_prod',
   >      'catalog-type'='hive',
   >      'write.upsert.enable'='true',
   >      'uri'='thrift://localhost:9083',
   >      'warehouse'='file:///tmp/iceberg/'
   > );
   
   INSERT INTO iceberg_table_upsert8 VALUES(1,'qq');
   
   INSERT INTO iceberg_table_upsert8 VALUES(1,'aa');
   
   Flink SQL> SELECT * FROM iceberg_table_upsert8 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
   +----+----------------------+--------------------------------+
   | op |                   id |                           data |
   +----+----------------------+--------------------------------+
   | -D |                    1 |                             qq |
   | +I |                    1 |                             qq |
   | -D |                    1 |                             aa |
   | +I |                    1 |                             aa |
   
   When I use Flink Batch query table ,it has tow records
   Flink SQL> select * from iceberg_table_upsert8;
   +----+----------------------+--------------------------------+
   | op |                   id |                           data |
   +----+----------------------+--------------------------------+
   | +I |                    1 |                             qq |
   | +I |                    1 |                             aa |
   +----+----------------------+--------------------------------+
   Received a total of 2 rows


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-945681519


   Thanks for the work @hameizi , I think I will take a look at this tomorrow ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
liubo1022126 commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-1043817208


   @hameizi @openinx @rdblue hi,boys,We are currently using the flink upsert function. In which version is this PR planned to be released?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xinbinhuang commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-959847806


   Hi @hameizi, thanks for the great work! I'm really interested in this patch to support `overwrite` snapshots because 90% of the write to iceberg in my current company is `overwrite` so it makes really hard/impossible to streaming read from these tables in Flink. I wonder if this patch requires Flink 1.13, or it'll also work for 1.12? 
   
   @openinx do you know if this PR can make it to the next release? if so, roughly when will be the next release or does it depends on the progress on the priority 1 items in the roadmap?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] MOBIN-F commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
MOBIN-F commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-950676190


   > @MOBIN-F It's true, this PR translate update to -D and +I.
   
   It will query the record(1,qq). but this record is from before the update,  Is that right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] MOBIN-F edited a comment on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
MOBIN-F edited a comment on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-950653689


    @hameizi
   I merged this PR and did some testing and when I update a record, it actually inserts a new record
   
   Flink SQL> CREATE TABLE iceberg_table_upsert8(
   >     id   BIGINT,
   >     data STRING,
   >     PRIMARY KEY (id) NOT ENFORCED
   > ) WITH (
   >      'connector'='iceberg',
   >      'format-version' = '2',
   >      'catalog-name'='hive_prod',
   >      'catalog-type'='hive',
   >      'write.upsert.enable'='true',
   >      'uri'='thrift://localhost:9083',
   >      'warehouse'='file:///tmp/iceberg/'
   > );
   
   INSERT INTO iceberg_table_upsert1 VALUES(1,'qq');
   
   INSERT INTO iceberg_table_upsert1 VALUES(1,'aa');
   
   Flink SQL> SELECT * FROM iceberg_table_upsert8 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
   +----+----------------------+--------------------------------+
   | op |                   id |                           data |
   +----+----------------------+--------------------------------+
   | -D |                    1 |                             qq |
   | +I |                    1 |                             qq |
   | -D |                    1 |                             aa |
   | +I |                    1 |                             aa |
   
   When I use Flink Batch query table ,it has tow records
   Flink SQL> select * from iceberg_table_upsert8;
   +----+----------------------+--------------------------------+
   | op |                   id |                           data |
   +----+----------------------+--------------------------------+
   | +I |                    1 |                             qq |
   | +I |                    1 |                             aa |
   +----+----------------------+--------------------------------+
   Received a total of 2 rows


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-918112411


   Thanks for @hameizi for contribution, I will take a look  tomorrow. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-950659188


   @MOBIN-F It's true, this PR translate update to -D and +I. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
kingeasternsun commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-1023020791


   > I merged this PR and did some testing. i used flink cdc to consume binlog and wrote to iceberg table, then run some flink sql query(streaming) and compared with mysql original query , then the results were not matched.
   > 
   > After some debug I found 2 issues.
   > 
   > 1. flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message,  it's completely not correct. The relevant codes are as follows:
   > 
   > `
   > 
   > ```
   > //org.apache.iceberg.flink.sink.BaseDeltaTaskWriter
   >  switch (row.getRowKind()) {
   >   case INSERT:
   >   case UPDATE_AFTER:
   >     if (upsert) {
   >       writer.delete(row);
   >     }
   >     writer.write(row);
   >     break;
   > 
   >   case UPDATE_BEFORE:
   >     if (upsert) {
   >       break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
   >     }
   >     writer.delete(row);
   >     break;
   >   case DELETE:
   >     writer.delete(row);
   >     break;
   > 
   >   default:
   >     throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
   > }
   > ```
   > 
   > `
   > 
   > 2. when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files  and no data file(data file is deleted),  flink will ignore equality delete files and miss all  -D .   function org.apache.iceberg.ManifestGroup.planStreamingFiles()   create FileScanTask  only when data file exists, so flink can not process equality delete files.
   > 
   > After fixed above 2 issues the test is passed.
   
   
   
   > BaseDeltaTaskWriter
   
   I think this code will be more clear if written like this
   ```java
     public void write(RowData row) throws IOException {
       RowDataDeltaWriter writer = route(row);
   
       switch (row.getRowKind()) {
         case INSERT:
           if (upsert) {
             writer.delete(row);
           }
           writer.write(row);
           break;
         case UPDATE_AFTER:
           writer.write(row);
           break;
         case UPDATE_BEFORE:
         case DELETE:
           writer.delete(row);
           break;
   
         default:
           throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
       }
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-916588737


   @openinx @stevenzwu Could you help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-960374866


   > I wonder if this patch requires Flink 1.13, or it'll also work for 1.12?
   
   @xinbinhuang It work for both Flink 1.12 and 1.13.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xinbinhuang commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-959847806






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xinbinhuang edited a comment on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
xinbinhuang edited a comment on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-959847806






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xinbinhuang edited a comment on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
xinbinhuang edited a comment on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-959847806


   Hi @hameizi, thanks for the great work! I'm really interested in this patch to support `overwrite` snapshots. In my company, 90% of the write to iceberg is `overwrite` by Spark, so it makes really hard/impossible to streaming read from these tables in Flink. I wonder if this patch requires Flink 1.13, or it'll also work for 1.12? 
   
   @openinx do you know if this PR can make it to the next release? if so, roughly when will be the next release or does it depends on the progress on the priority 1 items in the roadmap?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-945532234


   @openinx Could you help take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-966934609


   @hameizi ,  would you mind to resolve the conflicts ?  I think I can take a look for the first round later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] MOBIN-F commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
MOBIN-F commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-950653689


   I merged this PR and did some testing and when I update a record, it actually inserts a new record
   
   Flink SQL> CREATE TABLE iceberg_table_upsert8(
   >     id   BIGINT,
   >     data STRING,
   >     PRIMARY KEY (id) NOT ENFORCED
   > ) WITH (
   >      'connector'='iceberg',
   >      'format-version' = '2',
   >      'catalog-name'='hive_prod',
   >      'catalog-type'='hive',
   >      'write.upsert.enable'='true',
   >      'uri'='thrift://localhost:9083',
   >      'warehouse'='file:///tmp/iceberg/'
   > );
   
   INSERT INTO iceberg_table_upsert1 VALUES(1,'qq');
   
   INSERT INTO iceberg_table_upsert1 VALUES(1,'aa');
   
   Flink SQL> SELECT * FROM iceberg_table_upsert8 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
   +----+----------------------+--------------------------------+
   | op |                   id |                           data |
   +----+----------------------+--------------------------------+
   | -D |                    1 |                             qq |
   | +I |                    1 |                             qq |
   | -D |                    1 |                             aa |
   | +I |                    1 |                             aa |
   
   When I use Flink Batch query table ,it has tow records
   Flink SQL> select * from iceberg_table_upsert8;
   +----+----------------------+--------------------------------+
   | op |                   id |                           data |
   +----+----------------------+--------------------------------+
   | +I |                    1 |                             qq |
   | +I |                    1 |                             aa |
   +----+----------------------+--------------------------------+
   Received a total of 2 rows


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-1017216735


   > 1. flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message,  it's completely not correct. The relevant codes are as follows:
   
   For this issue, config `write.upsert.enabled` is not suitable for CDC. So if your scene is CDC you can config it false that result `upsert` false.
   
   
   > 2\. when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files  and no data file(data file is deleted),  flink will ignore equality delete files and miss all  -D .   function org.apache.iceberg.ManifestGroup.planStreamingFiles()   create FileScanTask  only when data file exists, so flink can not process equality delete files.
   
   I will do more test for this issue.
   Thanks for your feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Initial-neko commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
Initial-neko commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-964947287


   > > > > @MOBIN-F It's true, this PR translate update to -D and +I.
   > > > 
   > > > 
   > > > It will query the record(1,qq). but this record is from before the update, Is that right?
   > > 
   > > 
   > > If you don't query op of data, you will get only one result (1,aa).
   > 
   > Sorry, I don't understand how to get only one result(1,aa). Could you tell me how to do that?
   
   https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sqlclient/
   
   client mode can help you to do that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] MOBIN-F commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
MOBIN-F commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-966852774


   > > > > > @MOBIN-F It's true, this PR translate update to -D and +I.
   > > > > 
   > > > > 
   > > > > It will query the record(1,qq). but this record is from before the update, Is that right?
   > > > 
   > > > 
   > > > If you don't query op of data, you will get only one result (1,aa).
   > > 
   > > 
   > > Sorry, I don't understand how to get only one result(1,aa). Could you tell me how to do that?
   > 
   > https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sqlclient/
   > 
   > client mode can help you to do that
   
   Thanks ,After testing, the PR is problematic, there will be duplicate data in batch read, not suitable for Upsert stream and join scenario also have problems,
   I'm  trying to implement this feature according to the design documentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] MOBIN-F edited a comment on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
MOBIN-F edited a comment on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-966852774


   > > > > > @MOBIN-F It's true, this PR translate update to -D and +I.
   > > > > 
   > > > > 
   > > > > It will query the record(1,qq). but this record is from before the update, Is that right?
   > > > 
   > > > 
   > > > If you don't query op of data, you will get only one result (1,aa).
   > > 
   > > 
   > > Sorry, I don't understand how to get only one result(1,aa). Could you tell me how to do that?
   > 
   > https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sqlclient/
   > 
   > client mode can help you to do that
   
   Thanks ,After testing, the PR is problematic, there will be duplicate data in batch read, not suitable for Retract stream and join scenario also have problems,
   I'm  trying to implement this feature according to the design documentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-966954636


   > @hameizi , would you mind to resolve the conflicts ? I think I can take a look for the first round later.
   
   @openinx I will resolve the conflicts , but mayby take some time because there is many change in flink moudle. When do you have time to review? today?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-950678633


   > > @MOBIN-F It's true, this PR translate update to -D and +I.
   > 
   > It will query the record(1,qq). but this record is from before the update, Is that right?
   
   If you don't query op of data, you will get only one result (1,aa).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xianyouQ commented on pull request #3095: Flink: flink read iceberg upsert data use streaming mode

Posted by GitBox <gi...@apache.org>.
xianyouQ commented on pull request #3095:
URL: https://github.com/apache/iceberg/pull/3095#issuecomment-1019691039


   > For this issue, config `write.upsert.enabled` is not suitable for CDC. So if your scene is CDC you can config it false that result `upsert` false.
   Thanks , I also merged [change-delete-logic](https://github.com/hameizi/iceberg/tree/change-delete-logic)  , I think it's a better way to write delete rows.  I am confused why delete row writing to postition delete file also write to equality delete file. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org