You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/14 09:51:32 UTC

[GitHub] [iceberg] hbgstc123 opened a new issue #3905: how to consume historical iceberg data with flink?

hbgstc123 opened a new issue #3905:
URL: https://github.com/apache/iceberg/issues/3905


   Does anyone else have the need to reprocess historical iceberg data with flink?
   e.g. say I have a table partition by date, like '2022-01-13', '2022-01-12', and I want to reprocess historical data between partition '2022-01-01' and '2022-01-06', is this operation supported in current flink source implementation.
   The reason why use flink to do this is that there is already an online flink application performing the process logic to the incoming iceberg data incrementally, and I want to reuse the flink code.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1030876120


   When incremental read was initially implemented in FlinkSource. There is no row-level deletes (V2 spec) in Iceberg. Agree that it should support deletes if that is not the case.
   
   For overwrite, I see two scenarios
   1. rewrite without new data (like compaction). Here incremental should ignore overwrite snapshots
   2. overwrite due to backfill. In this case, technically we should include (delete all rows before overwrite and include all rows after overwrite). But that would be quite inefficient, maybe the downstream job (incremental read from Iceberg) should also do a backfill?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
hililiwei commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1024876282


   cc @hbgstc123  https://github.com/apache/iceberg/blob/c1ca6c536a0248ea752334889ab1c5fcc935f26f/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java#L95


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1030876120


   When incremental read was initially implemented in FlinkSource. There is no row-level deletes (V2 spec) in Iceberg. Agree that it should support deletes if that is not the case.
   
   For overwrite, I see two scenarios
   1. rewrite without new data (like compaction). Here incremental read should ignore overwrite snapshots. This is probably the more concern case.
   2. overwrite due to backfill. In this case, technically we should include (delete all rows before overwrite and insert all rows after overwrite). But that could be quite inefficient, maybe the downstream job (incremental read from Iceberg) should also do a backfill?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
hililiwei commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1032532349


   @hbgstc123 
   
   ```
   DataStream<RowData> batch = FlinkSource.forRowData()
        .env(env)
        .tableLoader(tableLoader)
        .streaming(false)
        .filters(.......)
        .build();
   ```
   This example will read all current records from table.
   
   ```
   DataStream<RowData> stream = FlinkSource.forRowData()
        .env(env)
        .tableLoader(tableLoader)
        .streaming(true)
        .startSnapshotId(table.currentSnapshot().snapshotId())
        .filters(.......)
        .build();
   ```
   This example will read incremental records which start from current snapshot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hbgstc123 commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
hbgstc123 commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1032156319


   @stevenzwu Yes, incremental read is inefficient, so if we have a source that read directly from the last snapshot is much better for scenarios like backfill.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hbgstc123 commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
hbgstc123 commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1030855391


   @stevenzwu @hililiwei flink source skip delete or overwrite snapshot, if a table contain these kind of snapshot, wouldn't the result be incorrect?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hbgstc123 commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
hbgstc123 commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1032621100


   @hililiwei thanks, i didn't noticed the branch of code after setting streaming to false before, it seems like this config can backfill data quite efficiently.
   But when setting .streaming(false), the SourceFunction produced by the createInput interface of StreamExecutionEnvironment doesn't seem to have checkpoint functions, so in the case of a failover after a checkpoint success, will the source still be reading data from the very beginning, and result in duplicated data?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
hililiwei commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1014323934


   Are you just trying to get the latest data between partition '2022-01-01' and '2022-01-06'? 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hbgstc123 commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
hbgstc123 commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1014430390


   @hililiwei yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1016651149


   @hililiwei this can be achieved by setting the filters in `ScanContext` when constructing the `FlinkSource`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hbgstc123 commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
hbgstc123 commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1014196697


   @Initial-neko thanks for reply, if i use the current incremental consumption, I will have the following problem:
   1.I need to keep snapshots for a long time, which is costly and not necessary;
   2.There must not be delete or overwrite snapshots in between, as you mentioned; if delete or overwrite snapshots present, the result will be incorrect. Event if delete and overwrite snapshots are supported in the future, going through all the snapshots will still be inefficient since I only need the final result;
   
   Maybe a source that apply a partition filter on a latest snapshot of an iceberg table is a better solution for this scenario.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Initial-neko commented on issue #3905: how to consume historical iceberg data with flink?

Posted by GitBox <gi...@apache.org>.
Initial-neko commented on issue #3905:
URL: https://github.com/apache/iceberg/issues/3905#issuecomment-1014080816


   According to the description, this is a time partition table, that is, you want to know the incremental data between XX and XX.
   
   The incremental consumption of flick can meet your requirements. The consumption of overwrite snapshot is not supported now.
   
   Detailed guidance can be observed https://iceberg.apache.org/#flink/ Streaming read


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org