You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/21 01:21:19 UTC

[GitHub] [iceberg] NOD507 opened a new issue #3941: [Feature Request] Support for change data capture

NOD507 opened a new issue #3941:
URL: https://github.com/apache/iceberg/issues/3941


   A change data feed option like the one on delta lake is very usefull for incremental syncs between tables or systems.
   
   CDC is one feature thats not on open source delta lake or iceberg. I dont think theres a format that supports CDC on spark open source at this moment.
   
   Hidding partitioning is currently a plus for iceberg vs delta lake, and CDC could be another one.
   
   https://docs.databricks.com/delta/delta-change-data-feed.html
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] NOD507 commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
NOD507 commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1024146798


   > I'm working on a Iceberg CDC design doc. It will be like the CDF(change data feed) in the proprietary Delta. Will keep you posted. It'd be helpful if you can share your CDC use case.
   
   We are interested in CDC to be able to incrementally sync changes from a datalake to a datawarehouse. To be able to do ETL in the datalake and use the dwh for end user consumption. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mohitgargk commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
mohitgargk commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1023912443


   @flyrain Agree on that. With CoW, without any row-identifier, it is not possible to establish link with the new state and old state of the records without ambiguities.
   
   For the CDF record structure, we grokked a bit on the design and came up with [this](https://docs.google.com/document/d/14ZRQc1anvXN52HGoNpSbKy7Ua6_7IIkr6mlXe_gHrJA/edit?usp=sharing). 
   
   Above proposal is based on our use case where we are interested in change of a certain column but also want the values of other columns. If the whole state is part of CDF record, the consumer can avoid query/lookup/join with the table to get rest of the state. 
   
   cc - @SreeramGarlapati @rawataaryan9 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1064596725


   > So having an Update record type would help to segregate it from Insert and Delete records in more convenient way.
   
   The problem right now is that there is no update in Iceberg. There are only inserts and deletes. An update is represented as a delete followed by an insert. That being said, there may be a way to construct an update record given delete and insert records. For example, we can shuffle the delete/insert records so that all record types for the same identity columns are next to each. 
   
   ```
   delete, s1, 100, null
   insert, s1, 100, 1
   ```
   
   In this case, we can construct a post update image. I am not sure how we can construct a pre update image without joining the delete record with the target table (that's going to be expensive). We can do that more or less efficiently for position deletes and copy-on-write but equality deletes may include only values for identity columns. We will have to scan a lot of data to reconstruct a pre update image for equality deletes.
   
   > Also, I noticed that in example 2 above, CDC records are generated for unchanged records (id=106). For Copy-On-Write tables, would this be the behaviour of CDC?
   
   This one is a little bit easier. To start with, we can report unchanged rows as it is exactly what happens in the table. Whenever we rewrite a file in copy-on-write, we delete all rows from that file and add new records where some records can be simply copied over. In the future, we can use the above idea and co-locate entries for the same identity columns. Then we can remove pairs where a record is deleted and added without any changes. This won't require any joins with the target table so won't be that expensive. Maybe, the action can have an option to perform this deduplication. That way, rows that were copied over in copy-on-write won't be part of the output.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] SreeramGarlapati commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
SreeramGarlapati commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1021469134


   @flyrain - we [started this design doc, which scopes the problem down to the tables that are written with MoR](https://docs.google.com/document/d/1A2oXgzMOhldSdpxhVnuRNYKozAaTAn3zWfE1LIeFLVs/edit?usp=sharing), a while back - after a community sync up. pl. see if you wanna adapt it or even change the same doc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mohitgargk edited a comment on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
mohitgargk edited a comment on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1023912443


   @flyrain Agree on that. With CoW, without any row-identifier, it is not possible to establish link with the new state and old state of the records without ambiguities.
   
   For the CDF record structure, we grokked a bit on the design and came up with [this](https://docs.google.com/document/d/14ZRQc1anvXN52HGoNpSbKy7Ua6_7IIkr6mlXe_gHrJA/edit?usp=sharing). 
   
   Above proposal is based on our use case where we are interested in change of a certain column but also want the values of other columns. If the whole state is part of CDF record, the consumer can avoid query/lookup/join with the table to get rest of the state. 
   
   cc - @SreeramGarlapati @rawataaryan9 @dharamshoo
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
flyrain commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1019676189


   I'm working on a Iceberg CDC design doc. It will be like the CDF(change data feed) in the proprietary Delta. Will keep you posted. It'd be helpful if you can share your CDC use case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
flyrain commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1021773631


   Thanks for sharing, @SreeramGarlapati! That is a good start and I'm glad you guys have started working on it. Can we have a discuss when my design doc is ready?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1064596725


   > So having an Update record type would help to segregate it from Insert and Delete records in more convenient way.
   
   The problem right now is that there is no update in Iceberg. There are only inserts and deletes. An update is represented as a delete followed by an insert. That being said, there may be a way to construct an update record given delete and insert records. For example, we can shuffle the delete/insert records so that all record types for the same identity columns are next to each. 
   
   ```
   delete, s1, 100, null
   insert, s1, 100, 1
   ```
   
   In this case, we can construct a post update image. I am not sure how we can construct a pre update image without joining the delete record with the target table (that's going to be expensive). We can do that more or less efficiently for position deletes and copy-on-write but equality deletes may include only values for identity columns. We will have to scan a lot of data to reconstruct a pre update image for equality deletes.
   
   This would only work if the identity columns are not modified.
   
   > Also, I noticed that in example 2 above, CDC records are generated for unchanged records (id=106). For Copy-On-Write tables, would this be the behaviour of CDC?
   
   This one is a little bit easier. To start with, we can report unchanged rows as it is exactly what happens in the table. Whenever we rewrite a file in copy-on-write, we delete all rows from that file and add new records where some records can be simply copied over. In the future, we can use the above idea and co-locate entries for the same identity columns. Then we can remove pairs where a record is deleted and added without any changes. This won't require any joins with the target table so won't be that expensive. Maybe, the action can have an option to perform this deduplication. That way, rows that were copied over in copy-on-write won't be part of the output.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] NOD507 commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
NOD507 commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1024148085


   > @flyrain Agree on that. With CoW, without any row-identifier, it is not possible to establish link with the new state and old state of the records without ambiguities.
   > 
   > For the CDF record structure, we grokked a bit on the design and came up with [this](https://docs.google.com/document/d/14ZRQc1anvXN52HGoNpSbKy7Ua6_7IIkr6mlXe_gHrJA/edit?usp=sharing).
   > 
   > Above proposal is based on our use case where we are interested in change of a certain column but also want the values of other columns. If the whole state is part of CDF record, the consumer can avoid query/lookup/join with the table to get rest of the state.
   > 
   > cc - @SreeramGarlapati @rawataaryan9 @dharamshoo
   
   This design would work great for our use case, is exactly what we need. Thanks for sharing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1064599459


   The algorithm above is not complete and does not cover some edge cases. However, I think it makes sense to explore it and check if we can modify/extend it to cover more use cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1064774980


   After thinking more, we may change the assumptions a bit.
   
   ### Revisited Assumptions
   
   - Changes are consumed per snapshot
   
   Iceberg does not distinguish the order of records within a snapshot. That's why Iceberg can only report a summary of what changed in a snapshot. 
   
   By default, rows added and removed in the same snapshot (e.g. Flink CDC) are not shown in the output. In most cases, we just want to see a net delta per snapshot. If a use case requires all records to be seen, it is still doable. For example, if a snapshot adds a data file A with two records (pos 0, 1) and a position delete file D that removes pos 0, we can output something like this.
   
   ```
    _record_type | _commit_snapshot_id | _commit_order | col1 | col2
   -------------------------------------------------------------------
   insert, s1, 0, 100, a
   insert, s1, 0, 101, a
   delete, s1, 1, 100, null
   ```
   
   That means records within the same snapshot may have different `_commit_order`. Seems a little bit odd but kind of represents what happens in the Iceberg table. Audit use cases may need this.
   
   - Output only delete and insert record types by default
   
   Iceberg does not have a notion of an update, which means constructing pre/post images will require some computation. In a lot of cases, this won't be needed (e.g. refresh of a materialized view, syncing changes with an external system). I think we should prefer a more efficient algorithm if possible. 
   
   There seems to be a way to build pre/post update images (see the comment above) but it will require joins and equality deletes will be the trickiest. For instance, a single equality delete may match a number of records and we have to report all of them.
   
   - Table must have identity columns defined
   
   Whenever we apply a delta log to a target table, most CDC use cases rely on a primary key or a set of identity columns. I think it is reasonable to assume Iceberg tables should have identity columns defined to support generation of CDC records.
   
   - Delete records may only include values for identity columns
   
   It is not required to output the entire deleted record if other columns are not stored in equality delete files. If values for other columns are not persisted, this would require an expensive join to reconstruct the entire row.
   
   ### Open questions
   
   - How to deal with changing identity columns?
   - How to deal with cases when an equality delete is issued using a set of columns that is different from identity columns?
   
   ### MVP
   
   - Implement a Spark action that would output delete/update record types per snapshot.
   
   ### Future
   
   - Option for outputting records added and removed in the same snapshot.
   - Option for pre/post update images.
   - Different ways to consume these changes (e.g. cdc metadata table).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] donatobarone commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
donatobarone commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1047116823


   +1 we have started to investigate iceberg internally and we realised that the CDC available today was just for append operations, which is definitely not ideal as processes will have to be built around the point in time functionalities to be able to get the changes. 
   Great that the community is looking into this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1064596725


   > So having an Update record type would help to segregate it from Insert and Delete records in more convenient way.
   
   The problem right now is that there is no update in Iceberg. There are only inserts and deletes. An update is represented as a delete followed by an insert. That being said, there may be a way to construct an update record given delete and insert records. For example, we can shuffle the delete/insert records so that all record types for the same identity columns are next to each other. 
   
   ```
   delete, s1, 100, null
   insert, s1, 100, 1
   ```
   
   In this case, we can construct a post update image. I am not sure how we can construct a pre update image without joining the delete record with the target table (that's going to be expensive). We can do that more or less efficiently for position deletes and copy-on-write but equality deletes may include only values for identity columns. We will have to scan a lot of data to reconstruct a pre update image for equality deletes.
   
   This would only work if the identity columns are not modified.
   
   > Also, I noticed that in example 2 above, CDC records are generated for unchanged records (id=106). For Copy-On-Write tables, would this be the behaviour of CDC?
   
   This one is a little bit easier. To start with, we can report unchanged rows as it is exactly what happens in the table. Whenever we rewrite a file in copy-on-write, we delete all rows from that file and add new records where some records can be simply copied over. In the future, we can use the above idea and co-locate entries for the same identity columns. Then we can remove pairs where a record is deleted and added without any changes. This won't require any joins with the target table so won't be that expensive. Maybe, the action can have an option to perform this deduplication. That way, rows that were copied over in copy-on-write won't be part of the output.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1038873547


   > @flyrain We are looking into two possible approaches here.
   > 1. Materialised where, like DeltaLake Change Data Feed that is generated in the write path.
   > 2. Change Data Feed is evaluated at the read path by consumers.
   
   I drafted a [design document](https://docs.google.com/document/d/1zEpNYcA5Tf5ysdoj3jO425A1QRI-3OMb_Fy8EG_9DD4/edit?usp=sharing) for case 1 base on write-time logging. 
   
   But as @flyrain mentioned in email, the approach based on write-time logging would require changing the table spec and changing all engines to fit the new table spec. Therefore, I would tend to take the approach of @flyrain and try to integrate it into flink. 
   
   Just paste my approach here so we can find it later when we need it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dharamsahoo commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
dharamsahoo commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1064323406


   Hi @aokolnychyi , thanks for the well illustrated writeup. I have a few comments related to the assumptions.
   
   > Output only delete and insert record types
   
   Some consumers might want to directly consume the CDC records and generate events/notifications. It would be useful for such consumers to have CDC records relate to the actual operation on Iceberg table record. So having an Update record type would help to segregate it from Insert and Delete records in more convenient way.
   
   > Delete records may only include values for identity columns
   
   For consumers wanting to generate notifications which include old & new values like sending email for customer preference update OR generate events when an attribute value crosses a threshold, the inclusion of non-identity column(s) in the Delete record (related to an Update operation on Iceberg table) will be very useful.
   
   Also, I noticed that in example 2 above, CDC records are generated for unchanged records (id=106). For Copy-On-Write tables, would this be the behaviour of CDC? If so, for snapshots where a small fraction of records are added/modified, there would be too many CDC records.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain edited a comment on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
flyrain edited a comment on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1023818621


   Hi @mohitgargk, we had multiple discussions internally and are working on a doc. Will share it later. Generally speaking, CDC on MOR is in a better position, we should target it first. CDC on COW is almost impossible without additional metadata column or extra logging. cc @aokolnychyi @RussellSpitzer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
flyrain commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1034123302


   We've discussed the solution we are working on in today's community sync-up. The solution can support both COW and MOR. We think it is the good first step to approach CDC. Will post the design doc soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mohitgargk commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
mohitgargk commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1022844940


   +1
   
   @flyrain We are looking into two possible approaches here.
   1.  Materialised where, like DeltaLake Change Data Feed that is generated in the write path.
   2. Change Data Feed is evaluated at the read path by consumers.
   
   At this point of time, we are grokking and spiking on approach #2. Would love to know your early thoughts on this. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1081273709


   @flyrain @RussellSpitzer and I chatted a little bit about the design and what a potential implementation can look like. There is a promising idea of using `_deleted` metadata column for building delete records.
   
   I'll summarize my current way of thinking.
   
   **Algorithm (per snapshot)**
   
   * Build insert records
       * Read all data files added in this snapshot and apply any matching position delete files added in the same snapshot. Equality deletes added in the same snapshot cannot apply (according to the spec).
   * Build delete records.
       * Read all data files marked as deleted in this snapshot. All such records are considered V1 deletes.
       * Build a predicate to use when looking for data records removed by position deletes.
           * Scan through metadata for added position delete files and identify affected partitions (no need to open delete files).
           * Build a predicate using min/max values on file path (no need to open delete files).
           * [future] If the total number of position delete records is reasonably small, read all of the locations and form an IN predicate on file path. 
The read can be either distributed or local.
       * Build a predicate to use when looking for data records removed by equality deletes.
           * Scan through metadata for added equality delete files and identify affected partitions (no need to open delete files).
           * Build a predicate using min/max values on identity columns or sort keys if such are present in equality delete files.
           * [future] If the total number of equality deletes is small, read all deleted values and form IN predicates on their values. To make this efficient, we may need to add support for IN predicates on multiple columns at the same time. That’s will require effort given the current expression API in Iceberg.
       * Combine the two predicates using OR. This will be the predicate to look for V2 deletes.
       * Read data files that have potential V2 deletes, project `_deleted` column and keep only deleted records.
       * Union V1 and V2 delete records. This gives us all deletes that happened in this snapshot.
   * Append metadata such as `_commit_snapshot_id` or `_record_type` to both insert and delete records.
   * Union insert and delete records into a single DataFrame that represents a CDC log for this snapshot.
   
   The algorithm is simple with the only downside is that it may be expensive to resolve equality deletes as they may not be scoped to any partitions. If there is a global equality delete, resolving it and finding all records that were deleted will require a full table scan. In the future, we may consider exposing an option to include equality deletes as-is without finding actually deleted records. If set to false, a delete record will only contain whatever values are in the delete file. Outputting equality deletes as-is may be enough in many cases.
   
   **How to apply changes to Iceberg tables?**
   We should be able to apply a CDC log from one Iceberg table to another by simply converting the log to data and equality delete files without doing a MERGE operation.
   
   **How to apply changes to non-Iceberg tables?**
   Use a MERGE operation. Note: whenever a CDC log is fetched, consumers may need to collapse changes that happened across snapshots to not violate the cardinality check in MERGE.
   
   **Do we have to output _identity_columns if equality deletes are resolved?** 
   Probably, not. That’s why we may omit the requirement for having identity columns defined (at least, for now). 
   
   **What should happen if a row is added and removed in the same snapshot?**
   Skip such records by default. If configured to output such records, we may do so by having different `_commit_order` within the same snapshot.
   
   **Can we support pre/post images?**
   I think pre/post images can be computed if equality delete resolution is enabled and identity columns are defined and never changed. Then we can distribute and order records in a way that will co-locate deletes and inserts for the same key next to each other. This should be sufficient to produce pre and post images.
   
   **Shall we output unchanged rows that were copied over during copy-on-write?**
   I'd output copied over rows by default as it is technically correct: we remove and add back such records from the table format perspective. We may expose an option in the future to skip such records but it will require extra work. Similarly to to pre/post images, if we can co-locate deletes and inserts for the same key next to each other, we may skip delete and insert if no value has changed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
coolderli commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1064821091


   Just share my thought.
   
   - Output only delete and insert record types by default
   Constructing pre/post images may be necessary. If the downstream is an external system that can handle the insert as upsert, the update_before(delete) can be dropped. But if the downstream requires aggregation operation like sum, the update_before can not be dropped.
   Because in the current implementation, the records have no metadata like create_timestamp, we can't determine the time of deletion and insertion, so maybe we have to delete before inserting. But this is unacceptable, this will cause data to jitter, and users will see the data decrease, and then the data returns to normal.
   
   - How to deal with changing identity columns?
   Maybe we need to consider adding some restrictions like always storing all rows in equality delete files when there is a streaming read. Then maybe we can use the latest primary key.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rajarshisarkar commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
rajarshisarkar commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1033429341


   Hi @flyrain Please let us know once the design doc is ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
flyrain commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1036710417


   The design doc is here, https://docs.google.com/document/d/1bN6rdLNcYOHnT3xVBfB33BoiPO06aKBo56SZmuU9pnY/edit?usp=sharing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
flyrain commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1023818621


   Hi @mohitgargk, we had multiple discussions internally and are working on a doc. Will share it later. Generally speaking, CDC on MOR is in a better position, we should target it first. CDC on COW is almost impossible without additional metadata column or extra logging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1061153554


   Based on Yufei’s design doc and what we discussed during the sync, I’d like to share my thoughts on what can be efficiently supported right now.
   
   ### Assumptions
   
   - Changes are consumed per snapshot, rows that are added and removed in the same snapshot can be skipped
   
   Iceberg does not distinguish the order of records within a snapshot. Users may manually deduce this using a data column. However, there is nothing in the metadata to support this at the moment. I believe it is sufficient to have a summary of what changed per snapshot for most use cases (except the audit use case). Specifically, if we add a record and immediately delete it in the same snapshot (Flink CDC), it is okay to skip this record and not report it in the CDC log.
   
   - Output only delete and insert record types
   
   From what I see, just knowing what records to remove and to add should be enough to support most use cases. I have an example below even if the target system is not Iceberg. In the future, we can add pre/post images but computing that with the current table metadata would require a join and would be expensive.
   
   - Table must have identity columns defined
   
   Whenever we apply a delta log to a target table, most CDC use cases rely on a primary key or a set of identity columns. I think it is reasonable to assume Iceberg tables should have identity columns defined to support generation of CDC records.
   
   - Delete records may only include values for identity columns
    
   It is not required to output the entire deleted record if other columns are not stored in equality delete files. If values for other columns are not persisted, this would require an expensive join to reconstruct the entire row.
   
   ### Unsupported
   
   - Audit use cases as they require a seq number per row
   - Pre/post update image (can be added in the future)
   
   The CDC record can include: 
   ```
   _record_type, _commit_snapshot_id, _commit_timestamp, _commit_order, _identity_columns, col1, col2, ...
   ```
   ### Algorithm (per snapshot)
   
   - Build insert records
   	- Read all data files written in this snapshot, applying any matching position delete files produced in the same snapshot. Equality deletes produced in this snapshot cannot apply.
   - Build delete records
   	- Read all data files marked as deleted in this snapshot (they always contain values for identity columns).
   	- Read all equality deletes (they always contain values for identity columns)
   	- Read position deletes, find the list of affected data files from previous snapshots, read those data files, join on `_file`, `_pos` with deletes, project identity columns.
   - Append metadata such as `_commit_snapshot_id` or `_record_type`
   - Union insert and delete records 
   
   The algorithm would only require one join to find identity columns for position deletes. However, it should be fairly efficient as position deletes are scoped to partitions and we know the exact file names to read. We don’t have to read any other files or do a full table scan.
   
   I think it should be sufficient to have only delete/insert record types in most cases. A generic MERGE statement can be used to apply changes for a single snapshot to both Iceberg and non-Iceberg tables.
   
   ```
   MERGE INTO target t USING source s
   ON t.id_col = s.id_col AND s._record_type = 'DELETE'
   WHEN MATCHED
     DELETE
   WHEN NOT MATCHED
     INSERT …
   ```
   
   If the destination table is another Iceberg table, we may skip the MERGE statement and write equality deletes and data files directly without querying the destination table.
   
   Here are a few examples (inspired by what Yufei has put together) I went through to see if this approach is going to work (double check it, please).
   
   ![image](https://user-images.githubusercontent.com/6235869/157117118-446f7ecd-77ca-48ae-b250-3254c1dbf529.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1081273709


   @flyrain @RussellSpitzer and I chatted a little bit about the design and what a potential implementation can look like. There is a promising idea of using `_deleted` metadata column for building delete records.
   
   I'll summarize my current way of thinking.
   
   **Algorithm (per snapshot)**
   
   * Build insert records.
       * Read all data files added in this snapshot and apply any matching position delete files added in the same snapshot. Equality deletes added in the same snapshot cannot apply (according to the spec).
   * Build delete records.
       * Read all data files marked as deleted in this snapshot. All such records are considered V1 deletes.
       * Build a predicate to use when looking for data records removed by position deletes.
           * Scan through metadata for added position delete files and identify affected partitions (no need to open delete files).
           * Build a predicate using min/max values on file path (no need to open delete files).
           * [future] If the total number of position delete records is reasonably small, read all of the locations and form an IN predicate on file path. 
The read can be either distributed or local.
       * Build a predicate to use when looking for data records removed by equality deletes.
           * Scan through metadata for added equality delete files and identify affected partitions (no need to open delete files).
           * Build a predicate using min/max values on identity columns or sort keys if such are present in equality delete files.
           * [future] If the total number of equality deletes is small, read all deleted values and form IN predicates on their values. To make this efficient, we may need to add support for IN predicates on multiple columns at the same time. That’s will require effort given the current expression API in Iceberg.
       * Combine the two predicates using OR. This will be the predicate to look for V2 deletes.
       * Read data files that have potential V2 deletes, project `_deleted` column and keep only deleted records.
       * Union V1 and V2 delete records. This gives us all deletes that happened in this snapshot.
   * Append metadata such as `_commit_snapshot_id` or `_record_type` to both insert and delete records.
   * Union insert and delete records into a single DataFrame that represents a CDC log for this snapshot.
   
   The algorithm is simple with the only downside is that it may be expensive to resolve equality deletes as they may not be scoped to any partitions. If there is a global equality delete, resolving it and finding all records that were deleted will require a full table scan. In the future, we may consider exposing an option to include equality deletes as-is without finding actually deleted records. If set to false, a delete record will only contain whatever values are in the delete file. Outputting equality deletes as-is may be enough in many cases.
   
   **How to apply changes to Iceberg tables?**
   We should be able to apply a CDC log from one Iceberg table to another by simply converting the log to data and equality delete files without doing a MERGE operation.
   
   **How to apply changes to non-Iceberg tables?**
   Use a MERGE operation. Note: whenever a CDC log is fetched, consumers may need to collapse changes that happened across snapshots to not violate the cardinality check in MERGE.
   
   **Do we have to output _identity_columns if equality deletes are resolved?** 
   Probably, not. That’s why we may omit the requirement for having identity columns defined (at least, for now). 
   
   **What should happen if a row is added and removed in the same snapshot?**
   Skip such records by default. If configured to output such records, we may do so by having different `_commit_order` within the same snapshot.
   
   **Can we support pre/post images?**
   I think pre/post images can be computed if equality delete resolution is enabled and identity columns are defined and never changed. Then we can distribute and order records in a way that will co-locate deletes and inserts for the same key next to each other. This should be sufficient to produce pre and post images.
   
   **Shall we output unchanged rows that were copied over during copy-on-write?**
   I'd output copied over rows by default as it is technically correct: we remove and add back such records from the table format perspective. We may expose an option in the future to skip such records but it will require extra work. Similarly to to pre/post images, if we can co-locate deletes and inserts for the same key next to each other, we may skip delete and insert if no value has changed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #3941: [Feature Request] Support for change data capture

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #3941:
URL: https://github.com/apache/iceberg/issues/3941#issuecomment-1065427093


   > Constructing pre/post images may be necessary. If the downstream is an external system that can handle the insert as upsert, the update_before(delete) can be dropped.
   
   I agree pre/post images may be necessary and I think there will be a way to build them (like I mention in the comment above). It will require more resources and will be tricky for equality deletes. That's why the proposal is not to include them by default. However, I am not sure I agree that pre/post images are required to apply changes to other systems.
   
   For example, consider the following CDC records (I ignored some columns).
   
   ```
    _record_type | _commit_snapshot_id | _commit_order | id | value
   -------------------------------------------------------------------
   delete, s1, 0, 100, null
   insert, s1, 0, 100, a
   insert, s1, 0, 101, a
   ```
   
   If the external system supports MERGE statements, we can issue the following command:
   
   ```
   MERGE INTO target t USING source s
   ON t.id = s.id AND s._record_type = 'delete'
   WHEN MATCHED
     DELETE
   WHEN NOT MATCHED
     INSERT …
   ```
   
   Even though we only had deletes/inserts, we still managed to represent an update in the external system.
   
   > But if the downstream requires aggregation operation like sum, the update_before can not be dropped.
   
   Can you elaborate a little bit more to make sure I have a good example to think through? When I explored such cases, I though it would be sufficient to know what partitions/keys changed so that aggregations affected by this can be recomputed. For instance, if you have an aggregation by day and you know a particular day has either deletes or new inserts, the aggregation must be recomputed.
   
   > Because in the current implementation, the records have no metadata like create_timestamp, we can't determine the time of deletion and insertion, so maybe we have to delete before inserting. But this is unacceptable, this will cause data to jitter, and users will see the data decrease, and then the data returns to normal.
   
   Can you give an example here too? In my examples, I assumed deletes with the same `_commit_order` are applied before inserts. Changes from different snapshots will have different `_commit_order`.
   
   ```
    _record_type | _commit_snapshot_id | _commit_order | id | value
   -------------------------------------------------------------------
   insert, s1, 0, 100, a
   delete, s2, 1, 100, null
   insert, s2, 1, 100, b
   insert, s3, 2, 101, a
   ```
   
   The above example would be append in `s1`, update in `s2`, another append in `s3`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org