You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Puneet Zaroo <pz...@netflix.com.INVALID> on 2021/11/01 21:41:59 UTC

Re: Iceberg Delete Compaction Interface Design

I had a few follow-up points.

1 *"(1) for hot partitions, users can try to only perform Convert and
Rewrite to keep delete file sizes and count manageable, until the partition
becomes cold and a Merge can be performed safely.".* : I believe for the
CDC use case it is hard to guarantee that  that partitions will turn cold
and can be merged without conflicts, as 'hotness' is a factor of mutation
rate in the source DB; and perhaps some partitions are always "hot"; so in
essence the following:  *"Merge strategy that does not do any bin-packing,
and only merges the delete files for each data file and writes it back. The
new data file will have the same sequence number as the old file before
Merge"* seems important. Though as a follow-up I am wondering why can't
this strategy do bin-packing or sorting as well; if that is required; as
long as the sequence number is not updated.

2 *"During the commit validation phase of a Merge operation, we need to
verify that for each data file that would be removed, there are no new
position deletes with higher sequence number added."* : Just to be clear;
for the tables only being written into by a Flink CDC pipeline, this should
not happen as position deletes are only created for in-progress
(uncommitted) data files, correct ?

Thanks and regards,
- Puneet



On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com> wrote:

> Had some offline discussions on Slack and WeChat.
>
> For Russell's point, we are reconfirming with related people on Slack, and
> will post updates once we have an agreement.
>
> Regarding point 6, for Flink CDC the data file flushed to disk might be
> associated with position deletes, but after the flush all deletes will be
> equality deletes, so 6-2 still works. After all, as long as data files for
> position deletes are not removed, the process should be able to succeed
> with optimistic retry. Currently we are missing the following that needs to
> be worked on to resolve the CDC performance issue:
> 1. We need to support setting the sequence number for individual content
> files.
> 2. During the commit validation phase of a Merge operation, we need to
> verify that for each data file that would be removed, there are no new
> position deletes with higher sequence number added. If detected, merge of
> that file has to be completely retried (we can support incremental progress
> for this).
>
> -Jack
>
>
> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <ru...@gmail.com>
> wrote:
>
>> I think I understood the Rewrite strategy discussion a little differently
>>
>> Binpack Strategy and SortStrategy each get a new flag which lets you pick
>> files based on their number of delete files. So basically you can set a
>> variety of parameters, small files, large files, files with deletes etc ...
>>
>> A new strategy is added which determines which file to rewrite by looking
>> for all files currently touched by delete files. Instead of looking through
>> files with X deletes, we look up all files affected by deletes and rewrite
>> them. Although now as I write this it's basically running the above
>> strategies with number of delete files >= 1 and files per group at 1. So
>> maybe it doesn't need another strategy?
>>
>> But maybe I got that wrong ...
>>
>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com> wrote:
>>
>>> Thanks to everyone who came to the meeting.
>>>
>>> Here is the full meeting recording I made:
>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>
>>> Here are some key takeaways:
>>>
>>> 1. we generally agreed upon the division of compactions into Rewrite,
>>> Convert and Merge.
>>>
>>> 2. Merge will be implemented through RewriteDataFiles as proposed in
>>> https://github.com/apache/iceberg/pull/3207, but instead as a new
>>> strategy by extending the existing BinPackStrategy. For users who would
>>> also like to run sort during Merge, we will have another delete strategy
>>> that extends the SortStrategy.
>>>
>>> 3. Merge can have an option that allows users to set the minimum numbers
>>> of delete files to trigger a compaction. However, that would result in very
>>> frequent compaction of full partition if people add many global delete
>>> files. A Convert of global equality deletes to partition position deletes
>>> while maintaining the same sequence number can be used to solve the issue.
>>> Currently there is no way to write files with a custom sequence number.
>>> This functionality needs to be added.
>>>
>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>> https://github.com/apache/iceberg/pull/2841.
>>>
>>> 5. we had some discussion around the separation of row and partition
>>> level filters. The general direction in the meeting is to just have a
>>> single filter method. We will sync offline to reach an agreement.
>>>
>>> 6. people raised the issue that if new delete files are added to a data
>>> file while a Merge is going on, then the Merge would fail. That causes huge
>>> performance issues for CDC streaming use cases and Merge is very hard to
>>> succeed. There are 2 proposed solutions:
>>>   (1) for hot partitions, users can try to only perform Convert and
>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>> becomes cold and a Merge can be performed safely.
>>>   (2) it looks like we need a Merge strategy that does not do any
>>> bin-packing, and only merges the delete files for each data file and writes
>>> it back. The new data file will have the same sequence number as the old
>>> file before Merge. By doing so, new delete files can still be applied
>>> safely and the compaction can succeed without concerns around conflict. The
>>> caveat is that this does not work for position deletes because the row
>>> position changes for each file after Merge. But for the CDC streaming use
>>> case it is acceptable to only write equality deletes, so this looks like a
>>> feasible approach.
>>>
>>> 7. people raised the concern about the memory consumption issue for the
>>> is_deleted metadata column. We ran out of time and will continue the
>>> discussion offline on Slack.
>>>
>>> Best,
>>> Jack Ye
>>>
>>>
>>>
>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> We are planning to have a meeting to discuss the design of Iceberg
>>>> delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>
>>>> We have also created the channel #compaction on Slack, please join the
>>>> channel for daily discussions if you are interested in the progress.
>>>>
>>>> Best,
>>>> Jack Ye
>>>>
>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> As there are more and more people adopting the v2 spec, we are seeing
>>>>> an increasing number of requests for delete compaction support.
>>>>>
>>>>> Here is a document discussing the use cases and basic interface design
>>>>> for it to get the community aligned around what compactions we would offer
>>>>> and how the interfaces would be divided:
>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>
>>>>> Any feedback would be appreciated!
>>>>>
>>>>> Best,
>>>>> Jack Ye
>>>>>
>>>>

Re: Iceberg Delete Compaction Interface Design

Posted by OpenInx <op...@gmail.com>.
Hi Yufei

There was a proposed PR for this :
https://github.com/apache/iceberg/pull/4522

On Thu, Apr 21, 2022 at 5:42 AM Yufei Gu <fl...@gmail.com> wrote:

> Hi team,
>
> Do we have a PR for this type of delete compaction?
>
>> Merge: the changes specified in delete files are applied to data files
>> and then overwrite the original data file, e.g. merging delete files to
>> data files.
>
>
>
> Yufei
>
>
>
> On Wed, Nov 3, 2021 at 8:29 AM Puneet Zaroo <pz...@netflix.com.invalid>
> wrote:
>
>> Sounds great. I will look at the PRs.
>> thanks,
>>
>> On Tue, Nov 2, 2021 at 11:35 PM Jack Ye <ye...@gmail.com> wrote:
>>
>>>
>>> Yes I am actually arriving at exactly the same conclusion as you just
>>> now. I was focusing on the immediate removal of delete files too much when
>>> writing the doc and lost this aspect that we don't need to remove the
>>> deletes after having the functionality to preserve sequence number.
>>>
>>> I just published https://github.com/apache/iceberg/pull/3454 to add the
>>> option for selecting based on deletes in BinPackStrategy this afternoon, I
>>> will add another PR that preserves the sequence number tomorrow.
>>>
>>> -Jack
>>>
>>> On Tue, Nov 2, 2021 at 11:23 PM Puneet Zaroo <pz...@netflix.com.invalid>
>>> wrote:
>>>
>>>> Thanks for further clarifications, and outlining the detailed steps for
>>>> the delete or 'MERGE' compaction. It seems this compaction is explicitly
>>>> geared towards removing delete files. While that may be useful; I feel for
>>>> CDC tables doing the Bin-pack and Sorting compactions and *removing
>>>> the NEED for reading delete files in downstream queries * quickly
>>>> without conflicts with concurrent CDC updates is more important. This
>>>> guarantees that downstream query performance is decent soon after the data
>>>> has landed in the table.
>>>> The actual delete file removal can happen in a somewhat delayed manner
>>>> as well; as that is now just a storage optimization as those delete files
>>>> are no longer accessed in the query path.
>>>>
>>>> The above requirements can be achieved if the output of the current
>>>> sorting and bin-pack actions also set the sequence number to the sequence
>>>> number of the snapshot with which the compaction was started. And of-course
>>>> the file selection criteria has to be extended to also pick files which
>>>> have > a threshold number of delete files associated with them, in addition
>>>> to the criteria already used (incorrect file size for bin pack or range
>>>> overlap for sort).
>>>>
>>>> Thanks,
>>>> - Puneet
>>>>
>>>> On Tue, Nov 2, 2021 at 7:33 PM Jack Ye <ye...@gmail.com> wrote:
>>>>
>>>>> > I think even with the custom sequence file numbers on output data
>>>>> files; the position delete files have to be deleted; *since position
>>>>> deletes also apply on data files with the same sequence number*.
>>>>> Also, unless I am missing something, I think the equality delete files
>>>>> cannot be deleted at the end of this compaction, as it is really hard to
>>>>> figure out if all the impacted data files have been rewritten:
>>>>>
>>>>> The plan is to always remove both position and equality deletes. Given
>>>>> a predicate (e.g. COMPACT table WHERE region='US'), the initial
>>>>> implementation will always compact full partitions by (1) look for all
>>>>> delete files based on the predicate, (2) get all impacted partitions, (3)
>>>>> rewrite all data files in those partitions that has deletes, (4) remove
>>>>> those delete files. The algorithm can be improved to a smaller subset of
>>>>> files, but currently we mostly rely on Iceberg's scan planning and as you
>>>>> said it's hard to figure out if a delete file (especially equality delete)
>>>>> covers any additional data files. But we know each delete file only belongs
>>>>> to a single partition, which guarantees the removal is safe. (For
>>>>> partitioned tables, global deletes will be handled separately and not
>>>>> removed unless specifically requested by the user because it requires a
>>>>> full table compact, but CDC does not write global deletes anyway)
>>>>>
>>>>> -Jack
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pz...@netflix.com.invalid>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the clarifications; and thanks for pulling together the
>>>>>> documentation for the row-level delete functionality separately; as that
>>>>>> will be very helpful.
>>>>>> I think we are in agreement on most points. I just want to reiterate
>>>>>> my understanding of the merge compaction behavior to make sure we are on
>>>>>> the same page.
>>>>>>
>>>>>> The output table of a Flink CDC pipeline will in a lot of cases have
>>>>>> small files with unsorted data; so doing the bin-pack and sorting
>>>>>> compactions is also important for those tables (and obviously being able to
>>>>>> do so without conflicts with incoming data is also important). If the
>>>>>> existing bin-pack and sort compaction actions are enhanced with
>>>>>>
>>>>>> 1) writing the output data files with the sequence number of the
>>>>>> snapshot with which the compaction was started with *AND *
>>>>>> 2) deleting all position delete files whose data files have been
>>>>>> rewritten;
>>>>>>
>>>>>> then I believe we can run those (bin-pack and sort) compactions as
>>>>>> well without fear of conflicting with newer CDC updates. I think even with
>>>>>> the custom sequence file numbers on output data files; the position delete
>>>>>> files have to be deleted; *since position deletes also apply on data
>>>>>> files with the same sequence number*. Also, unless I am missing
>>>>>> something, I think the equality delete files cannot be deleted at the end
>>>>>> of this compaction, as it is really hard to figure out if all the impacted
>>>>>> data files have been rewritten.
>>>>>>
>>>>>> If the bin-pack and sort compactions are enhanced in this manner;
>>>>>> then I foresee just running those so that the same compaction can take care
>>>>>> of all relevant optimizations for a table (including delete file removal).
>>>>>> At the end, potentially, only some unnecessary equality delete files will
>>>>>> be remaining which will have to be deleted by some other action.
>>>>>>
>>>>>> Thanks,
>>>>>> - Puneet
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>>
>>>>>>> > why can't this strategy do bin-packing or sorting as well; if that
>>>>>>> is required; as long as the sequence number is not updated.
>>>>>>> > wouldn't subsequent reads re-apply the delete files which were
>>>>>>> used in the merge as well?
>>>>>>>
>>>>>>> I think you are right, we can use the sequence number of the
>>>>>>> snapshot we start compaction as the new sequence number of all rewritten
>>>>>>> files because all the deletes of older sequence numbers related to the file
>>>>>>> have been applied. (correct me if I missed anything here)
>>>>>>> But from the correctness perspective it's the same because we remove
>>>>>>> those delete files as a part of the compaction so we will not really
>>>>>>> reapply those deletes.
>>>>>>> With this improvement, we can now also do bin-packing easily. (If we
>>>>>>> preserve all sequence numbers, we can still do bin-packing for files
>>>>>>> sharing the same sequence number, what I described was just the easiest way
>>>>>>> to ensure sequence number preservation)
>>>>>>>
>>>>>>> > for the tables only being written into by a Flink CDC pipeline,
>>>>>>> this should not happen as position deletes are only created for in-progress
>>>>>>> (uncommitted) data files, correct ?
>>>>>>>
>>>>>>> Correct
>>>>>>>
>>>>>>> > I believe for the CDC use case it is hard to guarantee that  that
>>>>>>> partitions will turn cold and can be merged without conflicts, as 'hotness'
>>>>>>> is a factor of mutation rate in the source DB
>>>>>>>
>>>>>>> I agree. When I say hot/cold I am mostly referring to those
>>>>>>> time-partitioned use cases with clear hot-cold division of data. But in the
>>>>>>> end the principle is that the compaction strategy should be determined by
>>>>>>> the characteristics of the partition. If all the partitions are always with
>>>>>>> high traffic, then I think merge with preserved sequence number seems like
>>>>>>> the way to go for the entire table.
>>>>>>>
>>>>>>> I have recently summarized all the related concepts in
>>>>>>> https://github.com/apache/iceberg/pull/3432, it would be great if
>>>>>>> you can take a look about anything else I missed, thanks!
>>>>>>>
>>>>>>> Best,
>>>>>>> Jack Ye
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo
>>>>>>> <pz...@netflix.com.invalid> wrote:
>>>>>>>
>>>>>>>> Another follow-up regarding this : *"Merge strategy that does not
>>>>>>>> do any bin-packing, and only merges the delete files for each data file and
>>>>>>>> writes it back. The new data file will have the same sequence number as the
>>>>>>>> old file before Merge"* ; shouldn't the sequence number be set to
>>>>>>>> the highest sequence number of any applied delete file to the data file. If
>>>>>>>> the sequence number of the data file is not changed at-all, wouldn't
>>>>>>>> subsequent reads re-apply the delete files which were used in the merge as
>>>>>>>> well?
>>>>>>>>
>>>>>>>> thanks
>>>>>>>>
>>>>>>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pz...@netflix.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I had a few follow-up points.
>>>>>>>>>
>>>>>>>>> 1 *"(1) for hot partitions, users can try to only perform Convert
>>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>>>>> partition becomes cold and a Merge can be performed safely.".* :
>>>>>>>>> I believe for the CDC use case it is hard to guarantee that  that
>>>>>>>>> partitions will turn cold and can be merged without conflicts, as 'hotness'
>>>>>>>>> is a factor of mutation rate in the source DB; and perhaps some partitions
>>>>>>>>> are always "hot"; so in essence the following:  *"Merge strategy
>>>>>>>>> that does not do any bin-packing, and only merges the delete files for each
>>>>>>>>> data file and writes it back. The new data file will have the same sequence
>>>>>>>>> number as the old file before Merge"* seems important. Though as
>>>>>>>>> a follow-up I am wondering why can't this strategy do bin-packing or
>>>>>>>>> sorting as well; if that is required; as long as the sequence number is not
>>>>>>>>> updated.
>>>>>>>>>
>>>>>>>>> 2 *"During the commit validation phase of a Merge operation, we
>>>>>>>>> need to verify that for each data file that would be removed, there are no
>>>>>>>>> new position deletes with higher sequence number added."* : Just
>>>>>>>>> to be clear; for the tables only being written into by a Flink CDC
>>>>>>>>> pipeline, this should not happen as position deletes are only created for
>>>>>>>>> in-progress (uncommitted) data files, correct ?
>>>>>>>>>
>>>>>>>>> Thanks and regards,
>>>>>>>>> - Puneet
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Had some offline discussions on Slack and WeChat.
>>>>>>>>>>
>>>>>>>>>> For Russell's point, we are reconfirming with related people on
>>>>>>>>>> Slack, and will post updates once we have an agreement.
>>>>>>>>>>
>>>>>>>>>> Regarding point 6, for Flink CDC the data file flushed to disk
>>>>>>>>>> might be associated with position deletes, but after the flush all deletes
>>>>>>>>>> will be equality deletes, so 6-2 still works. After all, as long as data
>>>>>>>>>> files for position deletes are not removed, the process should be able to
>>>>>>>>>> succeed with optimistic retry. Currently we are missing the following that
>>>>>>>>>> needs to be worked on to resolve the CDC performance issue:
>>>>>>>>>> 1. We need to support setting the sequence number for individual
>>>>>>>>>> content files.
>>>>>>>>>> 2. During the commit validation phase of a Merge operation, we
>>>>>>>>>> need to verify that for each data file that would be removed, there are no
>>>>>>>>>> new position deletes with higher sequence number added. If detected, merge
>>>>>>>>>> of that file has to be completely retried (we can support incremental
>>>>>>>>>> progress for this).
>>>>>>>>>>
>>>>>>>>>> -Jack
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>>>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think I understood the Rewrite strategy discussion a little
>>>>>>>>>>> differently
>>>>>>>>>>>
>>>>>>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets
>>>>>>>>>>> you pick files based on their number of delete files. So basically you can
>>>>>>>>>>> set a variety of parameters, small files, large files, files with deletes
>>>>>>>>>>> etc ...
>>>>>>>>>>>
>>>>>>>>>>> A new strategy is added which determines which file to rewrite
>>>>>>>>>>> by looking for all files currently touched by delete files. Instead of
>>>>>>>>>>> looking through files with X deletes, we look up all files affected by
>>>>>>>>>>> deletes and rewrite them. Although now as I write this it's basically
>>>>>>>>>>> running the above strategies with number of delete files >= 1 and files per
>>>>>>>>>>> group at 1. So maybe it doesn't need another strategy?
>>>>>>>>>>>
>>>>>>>>>>> But maybe I got that wrong ...
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks to everyone who came to the meeting.
>>>>>>>>>>>>
>>>>>>>>>>>> Here is the full meeting recording I made:
>>>>>>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>>>>>>>
>>>>>>>>>>>> Here are some key takeaways:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. we generally agreed upon the division of compactions into
>>>>>>>>>>>> Rewrite, Convert and Merge.
>>>>>>>>>>>>
>>>>>>>>>>>> 2. Merge will be implemented through RewriteDataFiles as
>>>>>>>>>>>> proposed in https://github.com/apache/iceberg/pull/3207, but
>>>>>>>>>>>> instead as a new strategy by extending the existing BinPackStrategy. For
>>>>>>>>>>>> users who would also like to run sort during Merge, we will have another
>>>>>>>>>>>> delete strategy that extends the SortStrategy.
>>>>>>>>>>>>
>>>>>>>>>>>> 3. Merge can have an option that allows users to set the
>>>>>>>>>>>> minimum numbers of delete files to trigger a compaction. However, that
>>>>>>>>>>>> would result in very frequent compaction of full partition if people add
>>>>>>>>>>>> many global delete files. A Convert of global equality deletes to partition
>>>>>>>>>>>> position deletes while maintaining the same sequence number can be used to
>>>>>>>>>>>> solve the issue. Currently there is no way to write files with a custom
>>>>>>>>>>>> sequence number. This functionality needs to be added.
>>>>>>>>>>>>
>>>>>>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>>>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>>>>>>>
>>>>>>>>>>>> 5. we had some discussion around the separation of row and
>>>>>>>>>>>> partition level filters. The general direction in the meeting is to just
>>>>>>>>>>>> have a single filter method. We will sync offline to reach an agreement.
>>>>>>>>>>>>
>>>>>>>>>>>> 6. people raised the issue that if new delete files are added
>>>>>>>>>>>> to a data file while a Merge is going on, then the Merge would fail. That
>>>>>>>>>>>> causes huge performance issues for CDC streaming use cases and Merge is
>>>>>>>>>>>> very hard to succeed. There are 2 proposed solutions:
>>>>>>>>>>>>   (1) for hot partitions, users can try to only perform Convert
>>>>>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>>>>>>>> partition becomes cold and a Merge can be performed safely.
>>>>>>>>>>>>   (2) it looks like we need a Merge strategy that does not do
>>>>>>>>>>>> any bin-packing, and only merges the delete files for each data file and
>>>>>>>>>>>> writes it back. The new data file will have the same sequence number as the
>>>>>>>>>>>> old file before Merge. By doing so, new delete files can still be applied
>>>>>>>>>>>> safely and the compaction can succeed without concerns around conflict. The
>>>>>>>>>>>> caveat is that this does not work for position deletes because the row
>>>>>>>>>>>> position changes for each file after Merge. But for the CDC streaming use
>>>>>>>>>>>> case it is acceptable to only write equality deletes, so this looks like a
>>>>>>>>>>>> feasible approach.
>>>>>>>>>>>>
>>>>>>>>>>>> 7. people raised the concern about the memory consumption issue
>>>>>>>>>>>> for the is_deleted metadata column. We ran out of time and will continue
>>>>>>>>>>>> the discussion offline on Slack.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>
>>>>>>>>>>>>> We are planning to have a meeting to discuss the design of
>>>>>>>>>>>>> Iceberg delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>>>>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We have also created the channel #compaction on Slack, please
>>>>>>>>>>>>> join the channel for daily discussions if you are interested in the
>>>>>>>>>>>>> progress.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As there are more and more people adopting the v2 spec, we
>>>>>>>>>>>>>> are seeing an increasing number of requests for delete compaction support.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here is a document discussing the use cases and basic
>>>>>>>>>>>>>> interface design for it to get the community aligned around what
>>>>>>>>>>>>>> compactions we would offer and how the interfaces would be divided:
>>>>>>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any feedback would be appreciated!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Re: Iceberg Delete Compaction Interface Design

Posted by Yufei Gu <fl...@gmail.com>.
Hi team,

Do we have a PR for this type of delete compaction?

> Merge: the changes specified in delete files are applied to data files
> and then overwrite the original data file, e.g. merging delete files to
> data files.



Yufei



On Wed, Nov 3, 2021 at 8:29 AM Puneet Zaroo <pz...@netflix.com.invalid>
wrote:

> Sounds great. I will look at the PRs.
> thanks,
>
> On Tue, Nov 2, 2021 at 11:35 PM Jack Ye <ye...@gmail.com> wrote:
>
>>
>> Yes I am actually arriving at exactly the same conclusion as you just
>> now. I was focusing on the immediate removal of delete files too much when
>> writing the doc and lost this aspect that we don't need to remove the
>> deletes after having the functionality to preserve sequence number.
>>
>> I just published https://github.com/apache/iceberg/pull/3454 to add the
>> option for selecting based on deletes in BinPackStrategy this afternoon, I
>> will add another PR that preserves the sequence number tomorrow.
>>
>> -Jack
>>
>> On Tue, Nov 2, 2021 at 11:23 PM Puneet Zaroo <pz...@netflix.com.invalid>
>> wrote:
>>
>>> Thanks for further clarifications, and outlining the detailed steps for
>>> the delete or 'MERGE' compaction. It seems this compaction is explicitly
>>> geared towards removing delete files. While that may be useful; I feel for
>>> CDC tables doing the Bin-pack and Sorting compactions and *removing the
>>> NEED for reading delete files in downstream queries * quickly without
>>> conflicts with concurrent CDC updates is more important. This guarantees
>>> that downstream query performance is decent soon after the data has landed
>>> in the table.
>>> The actual delete file removal can happen in a somewhat delayed manner
>>> as well; as that is now just a storage optimization as those delete files
>>> are no longer accessed in the query path.
>>>
>>> The above requirements can be achieved if the output of the current
>>> sorting and bin-pack actions also set the sequence number to the sequence
>>> number of the snapshot with which the compaction was started. And of-course
>>> the file selection criteria has to be extended to also pick files which
>>> have > a threshold number of delete files associated with them, in addition
>>> to the criteria already used (incorrect file size for bin pack or range
>>> overlap for sort).
>>>
>>> Thanks,
>>> - Puneet
>>>
>>> On Tue, Nov 2, 2021 at 7:33 PM Jack Ye <ye...@gmail.com> wrote:
>>>
>>>> > I think even with the custom sequence file numbers on output data
>>>> files; the position delete files have to be deleted; *since position
>>>> deletes also apply on data files with the same sequence number*. Also,
>>>> unless I am missing something, I think the equality delete files cannot be
>>>> deleted at the end of this compaction, as it is really hard to figure out
>>>> if all the impacted data files have been rewritten:
>>>>
>>>> The plan is to always remove both position and equality deletes. Given
>>>> a predicate (e.g. COMPACT table WHERE region='US'), the initial
>>>> implementation will always compact full partitions by (1) look for all
>>>> delete files based on the predicate, (2) get all impacted partitions, (3)
>>>> rewrite all data files in those partitions that has deletes, (4) remove
>>>> those delete files. The algorithm can be improved to a smaller subset of
>>>> files, but currently we mostly rely on Iceberg's scan planning and as you
>>>> said it's hard to figure out if a delete file (especially equality delete)
>>>> covers any additional data files. But we know each delete file only belongs
>>>> to a single partition, which guarantees the removal is safe. (For
>>>> partitioned tables, global deletes will be handled separately and not
>>>> removed unless specifically requested by the user because it requires a
>>>> full table compact, but CDC does not write global deletes anyway)
>>>>
>>>> -Jack
>>>>
>>>>
>>>>
>>>> On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pz...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Thanks for the clarifications; and thanks for pulling together the
>>>>> documentation for the row-level delete functionality separately; as that
>>>>> will be very helpful.
>>>>> I think we are in agreement on most points. I just want to reiterate
>>>>> my understanding of the merge compaction behavior to make sure we are on
>>>>> the same page.
>>>>>
>>>>> The output table of a Flink CDC pipeline will in a lot of cases have
>>>>> small files with unsorted data; so doing the bin-pack and sorting
>>>>> compactions is also important for those tables (and obviously being able to
>>>>> do so without conflicts with incoming data is also important). If the
>>>>> existing bin-pack and sort compaction actions are enhanced with
>>>>>
>>>>> 1) writing the output data files with the sequence number of the
>>>>> snapshot with which the compaction was started with *AND *
>>>>> 2) deleting all position delete files whose data files have been
>>>>> rewritten;
>>>>>
>>>>> then I believe we can run those (bin-pack and sort) compactions as
>>>>> well without fear of conflicting with newer CDC updates. I think even with
>>>>> the custom sequence file numbers on output data files; the position delete
>>>>> files have to be deleted; *since position deletes also apply on data
>>>>> files with the same sequence number*. Also, unless I am missing
>>>>> something, I think the equality delete files cannot be deleted at the end
>>>>> of this compaction, as it is really hard to figure out if all the impacted
>>>>> data files have been rewritten.
>>>>>
>>>>> If the bin-pack and sort compactions are enhanced in this manner; then
>>>>> I foresee just running those so that the same compaction can take care of
>>>>> all relevant optimizations for a table (including delete file removal). At
>>>>> the end, potentially, only some unnecessary equality delete files will be
>>>>> remaining which will have to be deleted by some other action.
>>>>>
>>>>> Thanks,
>>>>> - Puneet
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>
>>>>>> > why can't this strategy do bin-packing or sorting as well; if that
>>>>>> is required; as long as the sequence number is not updated.
>>>>>> > wouldn't subsequent reads re-apply the delete files which were used
>>>>>> in the merge as well?
>>>>>>
>>>>>> I think you are right, we can use the sequence number of the snapshot
>>>>>> we start compaction as the new sequence number of all rewritten files
>>>>>> because all the deletes of older sequence numbers related to the file have
>>>>>> been applied. (correct me if I missed anything here)
>>>>>> But from the correctness perspective it's the same because we remove
>>>>>> those delete files as a part of the compaction so we will not really
>>>>>> reapply those deletes.
>>>>>> With this improvement, we can now also do bin-packing easily. (If we
>>>>>> preserve all sequence numbers, we can still do bin-packing for files
>>>>>> sharing the same sequence number, what I described was just the easiest way
>>>>>> to ensure sequence number preservation)
>>>>>>
>>>>>> > for the tables only being written into by a Flink CDC pipeline,
>>>>>> this should not happen as position deletes are only created for in-progress
>>>>>> (uncommitted) data files, correct ?
>>>>>>
>>>>>> Correct
>>>>>>
>>>>>> > I believe for the CDC use case it is hard to guarantee that  that
>>>>>> partitions will turn cold and can be merged without conflicts, as 'hotness'
>>>>>> is a factor of mutation rate in the source DB
>>>>>>
>>>>>> I agree. When I say hot/cold I am mostly referring to those
>>>>>> time-partitioned use cases with clear hot-cold division of data. But in the
>>>>>> end the principle is that the compaction strategy should be determined by
>>>>>> the characteristics of the partition. If all the partitions are always with
>>>>>> high traffic, then I think merge with preserved sequence number seems like
>>>>>> the way to go for the entire table.
>>>>>>
>>>>>> I have recently summarized all the related concepts in
>>>>>> https://github.com/apache/iceberg/pull/3432, it would be great if
>>>>>> you can take a look about anything else I missed, thanks!
>>>>>>
>>>>>> Best,
>>>>>> Jack Ye
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo
>>>>>> <pz...@netflix.com.invalid> wrote:
>>>>>>
>>>>>>> Another follow-up regarding this : *"Merge strategy that does not
>>>>>>> do any bin-packing, and only merges the delete files for each data file and
>>>>>>> writes it back. The new data file will have the same sequence number as the
>>>>>>> old file before Merge"* ; shouldn't the sequence number be set to
>>>>>>> the highest sequence number of any applied delete file to the data file. If
>>>>>>> the sequence number of the data file is not changed at-all, wouldn't
>>>>>>> subsequent reads re-apply the delete files which were used in the merge as
>>>>>>> well?
>>>>>>>
>>>>>>> thanks
>>>>>>>
>>>>>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pz...@netflix.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I had a few follow-up points.
>>>>>>>>
>>>>>>>> 1 *"(1) for hot partitions, users can try to only perform Convert
>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>>>> partition becomes cold and a Merge can be performed safely.".* : I
>>>>>>>> believe for the CDC use case it is hard to guarantee that  that partitions
>>>>>>>> will turn cold and can be merged without conflicts, as 'hotness' is a
>>>>>>>> factor of mutation rate in the source DB; and perhaps some partitions are
>>>>>>>> always "hot"; so in essence the following:  *"Merge strategy that
>>>>>>>> does not do any bin-packing, and only merges the delete files for each data
>>>>>>>> file and writes it back. The new data file will have the same sequence
>>>>>>>> number as the old file before Merge"* seems important. Though as a
>>>>>>>> follow-up I am wondering why can't this strategy do bin-packing or sorting
>>>>>>>> as well; if that is required; as long as the sequence number is not updated.
>>>>>>>>
>>>>>>>> 2 *"During the commit validation phase of a Merge operation, we
>>>>>>>> need to verify that for each data file that would be removed, there are no
>>>>>>>> new position deletes with higher sequence number added."* : Just
>>>>>>>> to be clear; for the tables only being written into by a Flink CDC
>>>>>>>> pipeline, this should not happen as position deletes are only created for
>>>>>>>> in-progress (uncommitted) data files, correct ?
>>>>>>>>
>>>>>>>> Thanks and regards,
>>>>>>>> - Puneet
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Had some offline discussions on Slack and WeChat.
>>>>>>>>>
>>>>>>>>> For Russell's point, we are reconfirming with related people on
>>>>>>>>> Slack, and will post updates once we have an agreement.
>>>>>>>>>
>>>>>>>>> Regarding point 6, for Flink CDC the data file flushed to disk
>>>>>>>>> might be associated with position deletes, but after the flush all deletes
>>>>>>>>> will be equality deletes, so 6-2 still works. After all, as long as data
>>>>>>>>> files for position deletes are not removed, the process should be able to
>>>>>>>>> succeed with optimistic retry. Currently we are missing the following that
>>>>>>>>> needs to be worked on to resolve the CDC performance issue:
>>>>>>>>> 1. We need to support setting the sequence number for individual
>>>>>>>>> content files.
>>>>>>>>> 2. During the commit validation phase of a Merge operation, we
>>>>>>>>> need to verify that for each data file that would be removed, there are no
>>>>>>>>> new position deletes with higher sequence number added. If detected, merge
>>>>>>>>> of that file has to be completely retried (we can support incremental
>>>>>>>>> progress for this).
>>>>>>>>>
>>>>>>>>> -Jack
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I think I understood the Rewrite strategy discussion a little
>>>>>>>>>> differently
>>>>>>>>>>
>>>>>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets
>>>>>>>>>> you pick files based on their number of delete files. So basically you can
>>>>>>>>>> set a variety of parameters, small files, large files, files with deletes
>>>>>>>>>> etc ...
>>>>>>>>>>
>>>>>>>>>> A new strategy is added which determines which file to rewrite by
>>>>>>>>>> looking for all files currently touched by delete files. Instead of looking
>>>>>>>>>> through files with X deletes, we look up all files affected by deletes and
>>>>>>>>>> rewrite them. Although now as I write this it's basically running the above
>>>>>>>>>> strategies with number of delete files >= 1 and files per group at 1. So
>>>>>>>>>> maybe it doesn't need another strategy?
>>>>>>>>>>
>>>>>>>>>> But maybe I got that wrong ...
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks to everyone who came to the meeting.
>>>>>>>>>>>
>>>>>>>>>>> Here is the full meeting recording I made:
>>>>>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>>>>>>
>>>>>>>>>>> Here are some key takeaways:
>>>>>>>>>>>
>>>>>>>>>>> 1. we generally agreed upon the division of compactions into
>>>>>>>>>>> Rewrite, Convert and Merge.
>>>>>>>>>>>
>>>>>>>>>>> 2. Merge will be implemented through RewriteDataFiles as
>>>>>>>>>>> proposed in https://github.com/apache/iceberg/pull/3207, but
>>>>>>>>>>> instead as a new strategy by extending the existing BinPackStrategy. For
>>>>>>>>>>> users who would also like to run sort during Merge, we will have another
>>>>>>>>>>> delete strategy that extends the SortStrategy.
>>>>>>>>>>>
>>>>>>>>>>> 3. Merge can have an option that allows users to set the minimum
>>>>>>>>>>> numbers of delete files to trigger a compaction. However, that would result
>>>>>>>>>>> in very frequent compaction of full partition if people add many global
>>>>>>>>>>> delete files. A Convert of global equality deletes to partition
>>>>>>>>>>> position deletes while maintaining the same sequence number can be used to
>>>>>>>>>>> solve the issue. Currently there is no way to write files with a custom
>>>>>>>>>>> sequence number. This functionality needs to be added.
>>>>>>>>>>>
>>>>>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>>>>>>
>>>>>>>>>>> 5. we had some discussion around the separation of row and
>>>>>>>>>>> partition level filters. The general direction in the meeting is to just
>>>>>>>>>>> have a single filter method. We will sync offline to reach an agreement.
>>>>>>>>>>>
>>>>>>>>>>> 6. people raised the issue that if new delete files are added to
>>>>>>>>>>> a data file while a Merge is going on, then the Merge would fail. That
>>>>>>>>>>> causes huge performance issues for CDC streaming use cases and Merge is
>>>>>>>>>>> very hard to succeed. There are 2 proposed solutions:
>>>>>>>>>>>   (1) for hot partitions, users can try to only perform Convert
>>>>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>>>>>>> partition becomes cold and a Merge can be performed safely.
>>>>>>>>>>>   (2) it looks like we need a Merge strategy that does not do
>>>>>>>>>>> any bin-packing, and only merges the delete files for each data file and
>>>>>>>>>>> writes it back. The new data file will have the same sequence number as the
>>>>>>>>>>> old file before Merge. By doing so, new delete files can still be applied
>>>>>>>>>>> safely and the compaction can succeed without concerns around conflict. The
>>>>>>>>>>> caveat is that this does not work for position deletes because the row
>>>>>>>>>>> position changes for each file after Merge. But for the CDC streaming use
>>>>>>>>>>> case it is acceptable to only write equality deletes, so this looks like a
>>>>>>>>>>> feasible approach.
>>>>>>>>>>>
>>>>>>>>>>> 7. people raised the concern about the memory consumption issue
>>>>>>>>>>> for the is_deleted metadata column. We ran out of time and will continue
>>>>>>>>>>> the discussion offline on Slack.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jack Ye
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>
>>>>>>>>>>>> We are planning to have a meeting to discuss the design of
>>>>>>>>>>>> Iceberg delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>>>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>>>>>>
>>>>>>>>>>>> We have also created the channel #compaction on Slack, please
>>>>>>>>>>>> join the channel for daily discussions if you are interested in the
>>>>>>>>>>>> progress.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>
>>>>>>>>>>>>> As there are more and more people adopting the v2 spec, we are
>>>>>>>>>>>>> seeing an increasing number of requests for delete compaction support.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here is a document discussing the use cases and basic
>>>>>>>>>>>>> interface design for it to get the community aligned around what
>>>>>>>>>>>>> compactions we would offer and how the interfaces would be divided:
>>>>>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any feedback would be appreciated!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>>>
>>>>>>>>>>>>

Re: Iceberg Delete Compaction Interface Design

Posted by Puneet Zaroo <pz...@netflix.com.INVALID>.
Sounds great. I will look at the PRs.
thanks,

On Tue, Nov 2, 2021 at 11:35 PM Jack Ye <ye...@gmail.com> wrote:

>
> Yes I am actually arriving at exactly the same conclusion as you just now.
> I was focusing on the immediate removal of delete files too much when
> writing the doc and lost this aspect that we don't need to remove the
> deletes after having the functionality to preserve sequence number.
>
> I just published https://github.com/apache/iceberg/pull/3454 to add the
> option for selecting based on deletes in BinPackStrategy this afternoon, I
> will add another PR that preserves the sequence number tomorrow.
>
> -Jack
>
> On Tue, Nov 2, 2021 at 11:23 PM Puneet Zaroo <pz...@netflix.com.invalid>
> wrote:
>
>> Thanks for further clarifications, and outlining the detailed steps for
>> the delete or 'MERGE' compaction. It seems this compaction is explicitly
>> geared towards removing delete files. While that may be useful; I feel for
>> CDC tables doing the Bin-pack and Sorting compactions and *removing the
>> NEED for reading delete files in downstream queries * quickly without
>> conflicts with concurrent CDC updates is more important. This guarantees
>> that downstream query performance is decent soon after the data has landed
>> in the table.
>> The actual delete file removal can happen in a somewhat delayed manner as
>> well; as that is now just a storage optimization as those delete files are
>> no longer accessed in the query path.
>>
>> The above requirements can be achieved if the output of the current
>> sorting and bin-pack actions also set the sequence number to the sequence
>> number of the snapshot with which the compaction was started. And of-course
>> the file selection criteria has to be extended to also pick files which
>> have > a threshold number of delete files associated with them, in addition
>> to the criteria already used (incorrect file size for bin pack or range
>> overlap for sort).
>>
>> Thanks,
>> - Puneet
>>
>> On Tue, Nov 2, 2021 at 7:33 PM Jack Ye <ye...@gmail.com> wrote:
>>
>>> > I think even with the custom sequence file numbers on output data
>>> files; the position delete files have to be deleted; *since position
>>> deletes also apply on data files with the same sequence number*. Also,
>>> unless I am missing something, I think the equality delete files cannot be
>>> deleted at the end of this compaction, as it is really hard to figure out
>>> if all the impacted data files have been rewritten:
>>>
>>> The plan is to always remove both position and equality deletes. Given a
>>> predicate (e.g. COMPACT table WHERE region='US'), the initial
>>> implementation will always compact full partitions by (1) look for all
>>> delete files based on the predicate, (2) get all impacted partitions, (3)
>>> rewrite all data files in those partitions that has deletes, (4) remove
>>> those delete files. The algorithm can be improved to a smaller subset of
>>> files, but currently we mostly rely on Iceberg's scan planning and as you
>>> said it's hard to figure out if a delete file (especially equality delete)
>>> covers any additional data files. But we know each delete file only belongs
>>> to a single partition, which guarantees the removal is safe. (For
>>> partitioned tables, global deletes will be handled separately and not
>>> removed unless specifically requested by the user because it requires a
>>> full table compact, but CDC does not write global deletes anyway)
>>>
>>> -Jack
>>>
>>>
>>>
>>> On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pz...@netflix.com.invalid>
>>> wrote:
>>>
>>>> Thanks for the clarifications; and thanks for pulling together the
>>>> documentation for the row-level delete functionality separately; as that
>>>> will be very helpful.
>>>> I think we are in agreement on most points. I just want to reiterate my
>>>> understanding of the merge compaction behavior to make sure we are on the
>>>> same page.
>>>>
>>>> The output table of a Flink CDC pipeline will in a lot of cases have
>>>> small files with unsorted data; so doing the bin-pack and sorting
>>>> compactions is also important for those tables (and obviously being able to
>>>> do so without conflicts with incoming data is also important). If the
>>>> existing bin-pack and sort compaction actions are enhanced with
>>>>
>>>> 1) writing the output data files with the sequence number of the
>>>> snapshot with which the compaction was started with *AND *
>>>> 2) deleting all position delete files whose data files have been
>>>> rewritten;
>>>>
>>>> then I believe we can run those (bin-pack and sort) compactions as well
>>>> without fear of conflicting with newer CDC updates. I think even with the
>>>> custom sequence file numbers on output data files; the position delete
>>>> files have to be deleted; *since position deletes also apply on data
>>>> files with the same sequence number*. Also, unless I am missing
>>>> something, I think the equality delete files cannot be deleted at the end
>>>> of this compaction, as it is really hard to figure out if all the impacted
>>>> data files have been rewritten.
>>>>
>>>> If the bin-pack and sort compactions are enhanced in this manner; then
>>>> I foresee just running those so that the same compaction can take care of
>>>> all relevant optimizations for a table (including delete file removal). At
>>>> the end, potentially, only some unnecessary equality delete files will be
>>>> remaining which will have to be deleted by some other action.
>>>>
>>>> Thanks,
>>>> - Puneet
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <ye...@gmail.com> wrote:
>>>>
>>>>> > why can't this strategy do bin-packing or sorting as well; if that
>>>>> is required; as long as the sequence number is not updated.
>>>>> > wouldn't subsequent reads re-apply the delete files which were used
>>>>> in the merge as well?
>>>>>
>>>>> I think you are right, we can use the sequence number of the snapshot
>>>>> we start compaction as the new sequence number of all rewritten files
>>>>> because all the deletes of older sequence numbers related to the file have
>>>>> been applied. (correct me if I missed anything here)
>>>>> But from the correctness perspective it's the same because we remove
>>>>> those delete files as a part of the compaction so we will not really
>>>>> reapply those deletes.
>>>>> With this improvement, we can now also do bin-packing easily. (If we
>>>>> preserve all sequence numbers, we can still do bin-packing for files
>>>>> sharing the same sequence number, what I described was just the easiest way
>>>>> to ensure sequence number preservation)
>>>>>
>>>>> > for the tables only being written into by a Flink CDC pipeline, this
>>>>> should not happen as position deletes are only created for in-progress
>>>>> (uncommitted) data files, correct ?
>>>>>
>>>>> Correct
>>>>>
>>>>> > I believe for the CDC use case it is hard to guarantee that  that
>>>>> partitions will turn cold and can be merged without conflicts, as 'hotness'
>>>>> is a factor of mutation rate in the source DB
>>>>>
>>>>> I agree. When I say hot/cold I am mostly referring to those
>>>>> time-partitioned use cases with clear hot-cold division of data. But in the
>>>>> end the principle is that the compaction strategy should be determined by
>>>>> the characteristics of the partition. If all the partitions are always with
>>>>> high traffic, then I think merge with preserved sequence number seems like
>>>>> the way to go for the entire table.
>>>>>
>>>>> I have recently summarized all the related concepts in
>>>>> https://github.com/apache/iceberg/pull/3432, it would be great if you
>>>>> can take a look about anything else I missed, thanks!
>>>>>
>>>>> Best,
>>>>> Jack Ye
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo <pz...@netflix.com.invalid>
>>>>> wrote:
>>>>>
>>>>>> Another follow-up regarding this : *"Merge strategy that does not do
>>>>>> any bin-packing, and only merges the delete files for each data file and
>>>>>> writes it back. The new data file will have the same sequence number as the
>>>>>> old file before Merge"* ; shouldn't the sequence number be set to
>>>>>> the highest sequence number of any applied delete file to the data file. If
>>>>>> the sequence number of the data file is not changed at-all, wouldn't
>>>>>> subsequent reads re-apply the delete files which were used in the merge as
>>>>>> well?
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pz...@netflix.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I had a few follow-up points.
>>>>>>>
>>>>>>> 1 *"(1) for hot partitions, users can try to only perform Convert
>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>>> partition becomes cold and a Merge can be performed safely.".* : I
>>>>>>> believe for the CDC use case it is hard to guarantee that  that partitions
>>>>>>> will turn cold and can be merged without conflicts, as 'hotness' is a
>>>>>>> factor of mutation rate in the source DB; and perhaps some partitions are
>>>>>>> always "hot"; so in essence the following:  *"Merge strategy that
>>>>>>> does not do any bin-packing, and only merges the delete files for each data
>>>>>>> file and writes it back. The new data file will have the same sequence
>>>>>>> number as the old file before Merge"* seems important. Though as a
>>>>>>> follow-up I am wondering why can't this strategy do bin-packing or sorting
>>>>>>> as well; if that is required; as long as the sequence number is not updated.
>>>>>>>
>>>>>>> 2 *"During the commit validation phase of a Merge operation, we
>>>>>>> need to verify that for each data file that would be removed, there are no
>>>>>>> new position deletes with higher sequence number added."* : Just to
>>>>>>> be clear; for the tables only being written into by a Flink CDC pipeline,
>>>>>>> this should not happen as position deletes are only created for in-progress
>>>>>>> (uncommitted) data files, correct ?
>>>>>>>
>>>>>>> Thanks and regards,
>>>>>>> - Puneet
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Had some offline discussions on Slack and WeChat.
>>>>>>>>
>>>>>>>> For Russell's point, we are reconfirming with related people on
>>>>>>>> Slack, and will post updates once we have an agreement.
>>>>>>>>
>>>>>>>> Regarding point 6, for Flink CDC the data file flushed to disk
>>>>>>>> might be associated with position deletes, but after the flush all deletes
>>>>>>>> will be equality deletes, so 6-2 still works. After all, as long as data
>>>>>>>> files for position deletes are not removed, the process should be able to
>>>>>>>> succeed with optimistic retry. Currently we are missing the following that
>>>>>>>> needs to be worked on to resolve the CDC performance issue:
>>>>>>>> 1. We need to support setting the sequence number for individual
>>>>>>>> content files.
>>>>>>>> 2. During the commit validation phase of a Merge operation, we need
>>>>>>>> to verify that for each data file that would be removed, there are no new
>>>>>>>> position deletes with higher sequence number added. If detected, merge of
>>>>>>>> that file has to be completely retried (we can support incremental progress
>>>>>>>> for this).
>>>>>>>>
>>>>>>>> -Jack
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I think I understood the Rewrite strategy discussion a little
>>>>>>>>> differently
>>>>>>>>>
>>>>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets
>>>>>>>>> you pick files based on their number of delete files. So basically you can
>>>>>>>>> set a variety of parameters, small files, large files, files with deletes
>>>>>>>>> etc ...
>>>>>>>>>
>>>>>>>>> A new strategy is added which determines which file to rewrite by
>>>>>>>>> looking for all files currently touched by delete files. Instead of looking
>>>>>>>>> through files with X deletes, we look up all files affected by deletes and
>>>>>>>>> rewrite them. Although now as I write this it's basically running the above
>>>>>>>>> strategies with number of delete files >= 1 and files per group at 1. So
>>>>>>>>> maybe it doesn't need another strategy?
>>>>>>>>>
>>>>>>>>> But maybe I got that wrong ...
>>>>>>>>>
>>>>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks to everyone who came to the meeting.
>>>>>>>>>>
>>>>>>>>>> Here is the full meeting recording I made:
>>>>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>>>>>
>>>>>>>>>> Here are some key takeaways:
>>>>>>>>>>
>>>>>>>>>> 1. we generally agreed upon the division of compactions into
>>>>>>>>>> Rewrite, Convert and Merge.
>>>>>>>>>>
>>>>>>>>>> 2. Merge will be implemented through RewriteDataFiles as proposed
>>>>>>>>>> in https://github.com/apache/iceberg/pull/3207, but instead as a
>>>>>>>>>> new strategy by extending the existing BinPackStrategy. For users who would
>>>>>>>>>> also like to run sort during Merge, we will have another delete strategy
>>>>>>>>>> that extends the SortStrategy.
>>>>>>>>>>
>>>>>>>>>> 3. Merge can have an option that allows users to set the minimum
>>>>>>>>>> numbers of delete files to trigger a compaction. However, that would result
>>>>>>>>>> in very frequent compaction of full partition if people add many global
>>>>>>>>>> delete files. A Convert of global equality deletes to partition
>>>>>>>>>> position deletes while maintaining the same sequence number can be used to
>>>>>>>>>> solve the issue. Currently there is no way to write files with a custom
>>>>>>>>>> sequence number. This functionality needs to be added.
>>>>>>>>>>
>>>>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>>>>>
>>>>>>>>>> 5. we had some discussion around the separation of row and
>>>>>>>>>> partition level filters. The general direction in the meeting is to just
>>>>>>>>>> have a single filter method. We will sync offline to reach an agreement.
>>>>>>>>>>
>>>>>>>>>> 6. people raised the issue that if new delete files are added to
>>>>>>>>>> a data file while a Merge is going on, then the Merge would fail. That
>>>>>>>>>> causes huge performance issues for CDC streaming use cases and Merge is
>>>>>>>>>> very hard to succeed. There are 2 proposed solutions:
>>>>>>>>>>   (1) for hot partitions, users can try to only perform Convert
>>>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>>>>>> partition becomes cold and a Merge can be performed safely.
>>>>>>>>>>   (2) it looks like we need a Merge strategy that does not do any
>>>>>>>>>> bin-packing, and only merges the delete files for each data file and writes
>>>>>>>>>> it back. The new data file will have the same sequence number as the old
>>>>>>>>>> file before Merge. By doing so, new delete files can still be applied
>>>>>>>>>> safely and the compaction can succeed without concerns around conflict. The
>>>>>>>>>> caveat is that this does not work for position deletes because the row
>>>>>>>>>> position changes for each file after Merge. But for the CDC streaming use
>>>>>>>>>> case it is acceptable to only write equality deletes, so this looks like a
>>>>>>>>>> feasible approach.
>>>>>>>>>>
>>>>>>>>>> 7. people raised the concern about the memory consumption issue
>>>>>>>>>> for the is_deleted metadata column. We ran out of time and will continue
>>>>>>>>>> the discussion offline on Slack.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jack Ye
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>
>>>>>>>>>>> We are planning to have a meeting to discuss the design of
>>>>>>>>>>> Iceberg delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>>>>>
>>>>>>>>>>> We have also created the channel #compaction on Slack, please
>>>>>>>>>>> join the channel for daily discussions if you are interested in the
>>>>>>>>>>> progress.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jack Ye
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>
>>>>>>>>>>>> As there are more and more people adopting the v2 spec, we are
>>>>>>>>>>>> seeing an increasing number of requests for delete compaction support.
>>>>>>>>>>>>
>>>>>>>>>>>> Here is a document discussing the use cases and basic interface
>>>>>>>>>>>> design for it to get the community aligned around what compactions we would
>>>>>>>>>>>> offer and how the interfaces would be divided:
>>>>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>>>>>
>>>>>>>>>>>> Any feedback would be appreciated!
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>>
>>>>>>>>>>>

Re: Iceberg Delete Compaction Interface Design

Posted by Jack Ye <ye...@gmail.com>.
Yes I am actually arriving at exactly the same conclusion as you just now.
I was focusing on the immediate removal of delete files too much when
writing the doc and lost this aspect that we don't need to remove the
deletes after having the functionality to preserve sequence number.

I just published https://github.com/apache/iceberg/pull/3454 to add the
option for selecting based on deletes in BinPackStrategy this afternoon, I
will add another PR that preserves the sequence number tomorrow.

-Jack

On Tue, Nov 2, 2021 at 11:23 PM Puneet Zaroo <pz...@netflix.com.invalid>
wrote:

> Thanks for further clarifications, and outlining the detailed steps for
> the delete or 'MERGE' compaction. It seems this compaction is explicitly
> geared towards removing delete files. While that may be useful; I feel for
> CDC tables doing the Bin-pack and Sorting compactions and *removing the
> NEED for reading delete files in downstream queries * quickly without
> conflicts with concurrent CDC updates is more important. This guarantees
> that downstream query performance is decent soon after the data has landed
> in the table.
> The actual delete file removal can happen in a somewhat delayed manner as
> well; as that is now just a storage optimization as those delete files are
> no longer accessed in the query path.
>
> The above requirements can be achieved if the output of the current
> sorting and bin-pack actions also set the sequence number to the sequence
> number of the snapshot with which the compaction was started. And of-course
> the file selection criteria has to be extended to also pick files which
> have > a threshold number of delete files associated with them, in addition
> to the criteria already used (incorrect file size for bin pack or range
> overlap for sort).
>
> Thanks,
> - Puneet
>
> On Tue, Nov 2, 2021 at 7:33 PM Jack Ye <ye...@gmail.com> wrote:
>
>> > I think even with the custom sequence file numbers on output data
>> files; the position delete files have to be deleted; *since position
>> deletes also apply on data files with the same sequence number*. Also,
>> unless I am missing something, I think the equality delete files cannot be
>> deleted at the end of this compaction, as it is really hard to figure out
>> if all the impacted data files have been rewritten:
>>
>> The plan is to always remove both position and equality deletes. Given a
>> predicate (e.g. COMPACT table WHERE region='US'), the initial
>> implementation will always compact full partitions by (1) look for all
>> delete files based on the predicate, (2) get all impacted partitions, (3)
>> rewrite all data files in those partitions that has deletes, (4) remove
>> those delete files. The algorithm can be improved to a smaller subset of
>> files, but currently we mostly rely on Iceberg's scan planning and as you
>> said it's hard to figure out if a delete file (especially equality delete)
>> covers any additional data files. But we know each delete file only belongs
>> to a single partition, which guarantees the removal is safe. (For
>> partitioned tables, global deletes will be handled separately and not
>> removed unless specifically requested by the user because it requires a
>> full table compact, but CDC does not write global deletes anyway)
>>
>> -Jack
>>
>>
>>
>> On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pz...@netflix.com.invalid>
>> wrote:
>>
>>> Thanks for the clarifications; and thanks for pulling together the
>>> documentation for the row-level delete functionality separately; as that
>>> will be very helpful.
>>> I think we are in agreement on most points. I just want to reiterate my
>>> understanding of the merge compaction behavior to make sure we are on the
>>> same page.
>>>
>>> The output table of a Flink CDC pipeline will in a lot of cases have
>>> small files with unsorted data; so doing the bin-pack and sorting
>>> compactions is also important for those tables (and obviously being able to
>>> do so without conflicts with incoming data is also important). If the
>>> existing bin-pack and sort compaction actions are enhanced with
>>>
>>> 1) writing the output data files with the sequence number of the
>>> snapshot with which the compaction was started with *AND *
>>> 2) deleting all position delete files whose data files have been
>>> rewritten;
>>>
>>> then I believe we can run those (bin-pack and sort) compactions as well
>>> without fear of conflicting with newer CDC updates. I think even with the
>>> custom sequence file numbers on output data files; the position delete
>>> files have to be deleted; *since position deletes also apply on data
>>> files with the same sequence number*. Also, unless I am missing
>>> something, I think the equality delete files cannot be deleted at the end
>>> of this compaction, as it is really hard to figure out if all the impacted
>>> data files have been rewritten.
>>>
>>> If the bin-pack and sort compactions are enhanced in this manner; then I
>>> foresee just running those so that the same compaction can take care of all
>>> relevant optimizations for a table (including delete file removal). At the
>>> end, potentially, only some unnecessary equality delete files will be
>>> remaining which will have to be deleted by some other action.
>>>
>>> Thanks,
>>> - Puneet
>>>
>>>
>>>
>>>
>>> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <ye...@gmail.com> wrote:
>>>
>>>> > why can't this strategy do bin-packing or sorting as well; if that is
>>>> required; as long as the sequence number is not updated.
>>>> > wouldn't subsequent reads re-apply the delete files which were used
>>>> in the merge as well?
>>>>
>>>> I think you are right, we can use the sequence number of the snapshot
>>>> we start compaction as the new sequence number of all rewritten files
>>>> because all the deletes of older sequence numbers related to the file have
>>>> been applied. (correct me if I missed anything here)
>>>> But from the correctness perspective it's the same because we remove
>>>> those delete files as a part of the compaction so we will not really
>>>> reapply those deletes.
>>>> With this improvement, we can now also do bin-packing easily. (If we
>>>> preserve all sequence numbers, we can still do bin-packing for files
>>>> sharing the same sequence number, what I described was just the easiest way
>>>> to ensure sequence number preservation)
>>>>
>>>> > for the tables only being written into by a Flink CDC pipeline, this
>>>> should not happen as position deletes are only created for in-progress
>>>> (uncommitted) data files, correct ?
>>>>
>>>> Correct
>>>>
>>>> > I believe for the CDC use case it is hard to guarantee that  that
>>>> partitions will turn cold and can be merged without conflicts, as 'hotness'
>>>> is a factor of mutation rate in the source DB
>>>>
>>>> I agree. When I say hot/cold I am mostly referring to those
>>>> time-partitioned use cases with clear hot-cold division of data. But in the
>>>> end the principle is that the compaction strategy should be determined by
>>>> the characteristics of the partition. If all the partitions are always with
>>>> high traffic, then I think merge with preserved sequence number seems like
>>>> the way to go for the entire table.
>>>>
>>>> I have recently summarized all the related concepts in
>>>> https://github.com/apache/iceberg/pull/3432, it would be great if you
>>>> can take a look about anything else I missed, thanks!
>>>>
>>>> Best,
>>>> Jack Ye
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo <pz...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Another follow-up regarding this : *"Merge strategy that does not do
>>>>> any bin-packing, and only merges the delete files for each data file and
>>>>> writes it back. The new data file will have the same sequence number as the
>>>>> old file before Merge"* ; shouldn't the sequence number be set to the
>>>>> highest sequence number of any applied delete file to the data file. If the
>>>>> sequence number of the data file is not changed at-all, wouldn't subsequent
>>>>> reads re-apply the delete files which were used in the merge as well?
>>>>>
>>>>> thanks
>>>>>
>>>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pz...@netflix.com>
>>>>> wrote:
>>>>>
>>>>>> I had a few follow-up points.
>>>>>>
>>>>>> 1 *"(1) for hot partitions, users can try to only perform Convert
>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>> partition becomes cold and a Merge can be performed safely.".* : I
>>>>>> believe for the CDC use case it is hard to guarantee that  that partitions
>>>>>> will turn cold and can be merged without conflicts, as 'hotness' is a
>>>>>> factor of mutation rate in the source DB; and perhaps some partitions are
>>>>>> always "hot"; so in essence the following:  *"Merge strategy that
>>>>>> does not do any bin-packing, and only merges the delete files for each data
>>>>>> file and writes it back. The new data file will have the same sequence
>>>>>> number as the old file before Merge"* seems important. Though as a
>>>>>> follow-up I am wondering why can't this strategy do bin-packing or sorting
>>>>>> as well; if that is required; as long as the sequence number is not updated.
>>>>>>
>>>>>> 2 *"During the commit validation phase of a Merge operation, we need
>>>>>> to verify that for each data file that would be removed, there are no new
>>>>>> position deletes with higher sequence number added."* : Just to be
>>>>>> clear; for the tables only being written into by a Flink CDC pipeline, this
>>>>>> should not happen as position deletes are only created for in-progress
>>>>>> (uncommitted) data files, correct ?
>>>>>>
>>>>>> Thanks and regards,
>>>>>> - Puneet
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>>
>>>>>>> Had some offline discussions on Slack and WeChat.
>>>>>>>
>>>>>>> For Russell's point, we are reconfirming with related people on
>>>>>>> Slack, and will post updates once we have an agreement.
>>>>>>>
>>>>>>> Regarding point 6, for Flink CDC the data file flushed to disk might
>>>>>>> be associated with position deletes, but after the flush all deletes will
>>>>>>> be equality deletes, so 6-2 still works. After all, as long as data files
>>>>>>> for position deletes are not removed, the process should be able to succeed
>>>>>>> with optimistic retry. Currently we are missing the following that needs to
>>>>>>> be worked on to resolve the CDC performance issue:
>>>>>>> 1. We need to support setting the sequence number for individual
>>>>>>> content files.
>>>>>>> 2. During the commit validation phase of a Merge operation, we need
>>>>>>> to verify that for each data file that would be removed, there are no new
>>>>>>> position deletes with higher sequence number added. If detected, merge of
>>>>>>> that file has to be completely retried (we can support incremental progress
>>>>>>> for this).
>>>>>>>
>>>>>>> -Jack
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>>
>>>>>>>> I think I understood the Rewrite strategy discussion a little
>>>>>>>> differently
>>>>>>>>
>>>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets
>>>>>>>> you pick files based on their number of delete files. So basically you can
>>>>>>>> set a variety of parameters, small files, large files, files with deletes
>>>>>>>> etc ...
>>>>>>>>
>>>>>>>> A new strategy is added which determines which file to rewrite by
>>>>>>>> looking for all files currently touched by delete files. Instead of looking
>>>>>>>> through files with X deletes, we look up all files affected by deletes and
>>>>>>>> rewrite them. Although now as I write this it's basically running the above
>>>>>>>> strategies with number of delete files >= 1 and files per group at 1. So
>>>>>>>> maybe it doesn't need another strategy?
>>>>>>>>
>>>>>>>> But maybe I got that wrong ...
>>>>>>>>
>>>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks to everyone who came to the meeting.
>>>>>>>>>
>>>>>>>>> Here is the full meeting recording I made:
>>>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>>>>
>>>>>>>>> Here are some key takeaways:
>>>>>>>>>
>>>>>>>>> 1. we generally agreed upon the division of compactions into
>>>>>>>>> Rewrite, Convert and Merge.
>>>>>>>>>
>>>>>>>>> 2. Merge will be implemented through RewriteDataFiles as proposed
>>>>>>>>> in https://github.com/apache/iceberg/pull/3207, but instead as a
>>>>>>>>> new strategy by extending the existing BinPackStrategy. For users who would
>>>>>>>>> also like to run sort during Merge, we will have another delete strategy
>>>>>>>>> that extends the SortStrategy.
>>>>>>>>>
>>>>>>>>> 3. Merge can have an option that allows users to set the minimum
>>>>>>>>> numbers of delete files to trigger a compaction. However, that would result
>>>>>>>>> in very frequent compaction of full partition if people add many global
>>>>>>>>> delete files. A Convert of global equality deletes to partition
>>>>>>>>> position deletes while maintaining the same sequence number can be used to
>>>>>>>>> solve the issue. Currently there is no way to write files with a custom
>>>>>>>>> sequence number. This functionality needs to be added.
>>>>>>>>>
>>>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>>>>
>>>>>>>>> 5. we had some discussion around the separation of row and
>>>>>>>>> partition level filters. The general direction in the meeting is to just
>>>>>>>>> have a single filter method. We will sync offline to reach an agreement.
>>>>>>>>>
>>>>>>>>> 6. people raised the issue that if new delete files are added to a
>>>>>>>>> data file while a Merge is going on, then the Merge would fail. That causes
>>>>>>>>> huge performance issues for CDC streaming use cases and Merge is very hard
>>>>>>>>> to succeed. There are 2 proposed solutions:
>>>>>>>>>   (1) for hot partitions, users can try to only perform Convert
>>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>>>>> partition becomes cold and a Merge can be performed safely.
>>>>>>>>>   (2) it looks like we need a Merge strategy that does not do any
>>>>>>>>> bin-packing, and only merges the delete files for each data file and writes
>>>>>>>>> it back. The new data file will have the same sequence number as the old
>>>>>>>>> file before Merge. By doing so, new delete files can still be applied
>>>>>>>>> safely and the compaction can succeed without concerns around conflict. The
>>>>>>>>> caveat is that this does not work for position deletes because the row
>>>>>>>>> position changes for each file after Merge. But for the CDC streaming use
>>>>>>>>> case it is acceptable to only write equality deletes, so this looks like a
>>>>>>>>> feasible approach.
>>>>>>>>>
>>>>>>>>> 7. people raised the concern about the memory consumption issue
>>>>>>>>> for the is_deleted metadata column. We ran out of time and will continue
>>>>>>>>> the discussion offline on Slack.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jack Ye
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>
>>>>>>>>>> We are planning to have a meeting to discuss the design of
>>>>>>>>>> Iceberg delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>>>>
>>>>>>>>>> We have also created the channel #compaction on Slack, please
>>>>>>>>>> join the channel for daily discussions if you are interested in the
>>>>>>>>>> progress.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jack Ye
>>>>>>>>>>
>>>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>
>>>>>>>>>>> As there are more and more people adopting the v2 spec, we are
>>>>>>>>>>> seeing an increasing number of requests for delete compaction support.
>>>>>>>>>>>
>>>>>>>>>>> Here is a document discussing the use cases and basic interface
>>>>>>>>>>> design for it to get the community aligned around what compactions we would
>>>>>>>>>>> offer and how the interfaces would be divided:
>>>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>>>>
>>>>>>>>>>> Any feedback would be appreciated!
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jack Ye
>>>>>>>>>>>
>>>>>>>>>>

Re: Iceberg Delete Compaction Interface Design

Posted by Puneet Zaroo <pz...@netflix.com.INVALID>.
Thanks for further clarifications, and outlining the detailed steps for the
delete or 'MERGE' compaction. It seems this compaction is explicitly geared
towards removing delete files. While that may be useful; I feel for CDC
tables doing the Bin-pack and Sorting compactions and *removing the NEED
for reading delete files in downstream queries * quickly without conflicts
with concurrent CDC updates is more important. This guarantees that
downstream query performance is decent soon after the data has landed in
the table.
The actual delete file removal can happen in a somewhat delayed manner as
well; as that is now just a storage optimization as those delete files are
no longer accessed in the query path.

The above requirements can be achieved if the output of the current sorting
and bin-pack actions also set the sequence number to the sequence number of
the snapshot with which the compaction was started. And of-course the file
selection criteria has to be extended to also pick files which have > a
threshold number of delete files associated with them, in addition to the
criteria already used (incorrect file size for bin pack or range overlap
for sort).

Thanks,
- Puneet

On Tue, Nov 2, 2021 at 7:33 PM Jack Ye <ye...@gmail.com> wrote:

> > I think even with the custom sequence file numbers on output data files;
> the position delete files have to be deleted; *since position deletes
> also apply on data files with the same sequence number*. Also, unless I
> am missing something, I think the equality delete files cannot be deleted
> at the end of this compaction, as it is really hard to figure out if all
> the impacted data files have been rewritten:
>
> The plan is to always remove both position and equality deletes. Given a
> predicate (e.g. COMPACT table WHERE region='US'), the initial
> implementation will always compact full partitions by (1) look for all
> delete files based on the predicate, (2) get all impacted partitions, (3)
> rewrite all data files in those partitions that has deletes, (4) remove
> those delete files. The algorithm can be improved to a smaller subset of
> files, but currently we mostly rely on Iceberg's scan planning and as you
> said it's hard to figure out if a delete file (especially equality delete)
> covers any additional data files. But we know each delete file only belongs
> to a single partition, which guarantees the removal is safe. (For
> partitioned tables, global deletes will be handled separately and not
> removed unless specifically requested by the user because it requires a
> full table compact, but CDC does not write global deletes anyway)
>
> -Jack
>
>
>
> On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pz...@netflix.com.invalid>
> wrote:
>
>> Thanks for the clarifications; and thanks for pulling together the
>> documentation for the row-level delete functionality separately; as that
>> will be very helpful.
>> I think we are in agreement on most points. I just want to reiterate my
>> understanding of the merge compaction behavior to make sure we are on the
>> same page.
>>
>> The output table of a Flink CDC pipeline will in a lot of cases have
>> small files with unsorted data; so doing the bin-pack and sorting
>> compactions is also important for those tables (and obviously being able to
>> do so without conflicts with incoming data is also important). If the
>> existing bin-pack and sort compaction actions are enhanced with
>>
>> 1) writing the output data files with the sequence number of the snapshot
>> with which the compaction was started with *AND *
>> 2) deleting all position delete files whose data files have been
>> rewritten;
>>
>> then I believe we can run those (bin-pack and sort) compactions as well
>> without fear of conflicting with newer CDC updates. I think even with the
>> custom sequence file numbers on output data files; the position delete
>> files have to be deleted; *since position deletes also apply on data
>> files with the same sequence number*. Also, unless I am missing
>> something, I think the equality delete files cannot be deleted at the end
>> of this compaction, as it is really hard to figure out if all the impacted
>> data files have been rewritten.
>>
>> If the bin-pack and sort compactions are enhanced in this manner; then I
>> foresee just running those so that the same compaction can take care of all
>> relevant optimizations for a table (including delete file removal). At the
>> end, potentially, only some unnecessary equality delete files will be
>> remaining which will have to be deleted by some other action.
>>
>> Thanks,
>> - Puneet
>>
>>
>>
>>
>> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <ye...@gmail.com> wrote:
>>
>>> > why can't this strategy do bin-packing or sorting as well; if that is
>>> required; as long as the sequence number is not updated.
>>> > wouldn't subsequent reads re-apply the delete files which were used in
>>> the merge as well?
>>>
>>> I think you are right, we can use the sequence number of the snapshot we
>>> start compaction as the new sequence number of all rewritten files because
>>> all the deletes of older sequence numbers related to the file have been
>>> applied. (correct me if I missed anything here)
>>> But from the correctness perspective it's the same because we remove
>>> those delete files as a part of the compaction so we will not really
>>> reapply those deletes.
>>> With this improvement, we can now also do bin-packing easily. (If we
>>> preserve all sequence numbers, we can still do bin-packing for files
>>> sharing the same sequence number, what I described was just the easiest way
>>> to ensure sequence number preservation)
>>>
>>> > for the tables only being written into by a Flink CDC pipeline, this
>>> should not happen as position deletes are only created for in-progress
>>> (uncommitted) data files, correct ?
>>>
>>> Correct
>>>
>>> > I believe for the CDC use case it is hard to guarantee that  that
>>> partitions will turn cold and can be merged without conflicts, as 'hotness'
>>> is a factor of mutation rate in the source DB
>>>
>>> I agree. When I say hot/cold I am mostly referring to those
>>> time-partitioned use cases with clear hot-cold division of data. But in the
>>> end the principle is that the compaction strategy should be determined by
>>> the characteristics of the partition. If all the partitions are always with
>>> high traffic, then I think merge with preserved sequence number seems like
>>> the way to go for the entire table.
>>>
>>> I have recently summarized all the related concepts in
>>> https://github.com/apache/iceberg/pull/3432, it would be great if you
>>> can take a look about anything else I missed, thanks!
>>>
>>> Best,
>>> Jack Ye
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo <pz...@netflix.com.invalid>
>>> wrote:
>>>
>>>> Another follow-up regarding this : *"Merge strategy that does not do
>>>> any bin-packing, and only merges the delete files for each data file and
>>>> writes it back. The new data file will have the same sequence number as the
>>>> old file before Merge"* ; shouldn't the sequence number be set to the
>>>> highest sequence number of any applied delete file to the data file. If the
>>>> sequence number of the data file is not changed at-all, wouldn't subsequent
>>>> reads re-apply the delete files which were used in the merge as well?
>>>>
>>>> thanks
>>>>
>>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pz...@netflix.com> wrote:
>>>>
>>>>> I had a few follow-up points.
>>>>>
>>>>> 1 *"(1) for hot partitions, users can try to only perform Convert and
>>>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>>>> becomes cold and a Merge can be performed safely.".* : I believe for
>>>>> the CDC use case it is hard to guarantee that  that partitions will turn
>>>>> cold and can be merged without conflicts, as 'hotness' is a factor of
>>>>> mutation rate in the source DB; and perhaps some partitions are always
>>>>> "hot"; so in essence the following:  *"Merge strategy that does not
>>>>> do any bin-packing, and only merges the delete files for each data file and
>>>>> writes it back. The new data file will have the same sequence number as the
>>>>> old file before Merge"* seems important. Though as a follow-up I am
>>>>> wondering why can't this strategy do bin-packing or sorting as well; if
>>>>> that is required; as long as the sequence number is not updated.
>>>>>
>>>>> 2 *"During the commit validation phase of a Merge operation, we need
>>>>> to verify that for each data file that would be removed, there are no new
>>>>> position deletes with higher sequence number added."* : Just to be
>>>>> clear; for the tables only being written into by a Flink CDC pipeline, this
>>>>> should not happen as position deletes are only created for in-progress
>>>>> (uncommitted) data files, correct ?
>>>>>
>>>>> Thanks and regards,
>>>>> - Puneet
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>
>>>>>> Had some offline discussions on Slack and WeChat.
>>>>>>
>>>>>> For Russell's point, we are reconfirming with related people on
>>>>>> Slack, and will post updates once we have an agreement.
>>>>>>
>>>>>> Regarding point 6, for Flink CDC the data file flushed to disk might
>>>>>> be associated with position deletes, but after the flush all deletes will
>>>>>> be equality deletes, so 6-2 still works. After all, as long as data files
>>>>>> for position deletes are not removed, the process should be able to succeed
>>>>>> with optimistic retry. Currently we are missing the following that needs to
>>>>>> be worked on to resolve the CDC performance issue:
>>>>>> 1. We need to support setting the sequence number for individual
>>>>>> content files.
>>>>>> 2. During the commit validation phase of a Merge operation, we need
>>>>>> to verify that for each data file that would be removed, there are no new
>>>>>> position deletes with higher sequence number added. If detected, merge of
>>>>>> that file has to be completely retried (we can support incremental progress
>>>>>> for this).
>>>>>>
>>>>>> -Jack
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>>>> russell.spitzer@gmail.com> wrote:
>>>>>>
>>>>>>> I think I understood the Rewrite strategy discussion a little
>>>>>>> differently
>>>>>>>
>>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets you
>>>>>>> pick files based on their number of delete files. So basically you can set
>>>>>>> a variety of parameters, small files, large files, files with deletes etc
>>>>>>> ...
>>>>>>>
>>>>>>> A new strategy is added which determines which file to rewrite by
>>>>>>> looking for all files currently touched by delete files. Instead of looking
>>>>>>> through files with X deletes, we look up all files affected by deletes and
>>>>>>> rewrite them. Although now as I write this it's basically running the above
>>>>>>> strategies with number of delete files >= 1 and files per group at 1. So
>>>>>>> maybe it doesn't need another strategy?
>>>>>>>
>>>>>>> But maybe I got that wrong ...
>>>>>>>
>>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks to everyone who came to the meeting.
>>>>>>>>
>>>>>>>> Here is the full meeting recording I made:
>>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>>>
>>>>>>>> Here are some key takeaways:
>>>>>>>>
>>>>>>>> 1. we generally agreed upon the division of compactions into
>>>>>>>> Rewrite, Convert and Merge.
>>>>>>>>
>>>>>>>> 2. Merge will be implemented through RewriteDataFiles as proposed
>>>>>>>> in https://github.com/apache/iceberg/pull/3207, but instead as a
>>>>>>>> new strategy by extending the existing BinPackStrategy. For users who would
>>>>>>>> also like to run sort during Merge, we will have another delete strategy
>>>>>>>> that extends the SortStrategy.
>>>>>>>>
>>>>>>>> 3. Merge can have an option that allows users to set the minimum
>>>>>>>> numbers of delete files to trigger a compaction. However, that would result
>>>>>>>> in very frequent compaction of full partition if people add many global
>>>>>>>> delete files. A Convert of global equality deletes to partition
>>>>>>>> position deletes while maintaining the same sequence number can be used to
>>>>>>>> solve the issue. Currently there is no way to write files with a custom
>>>>>>>> sequence number. This functionality needs to be added.
>>>>>>>>
>>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>>>
>>>>>>>> 5. we had some discussion around the separation of row and
>>>>>>>> partition level filters. The general direction in the meeting is to just
>>>>>>>> have a single filter method. We will sync offline to reach an agreement.
>>>>>>>>
>>>>>>>> 6. people raised the issue that if new delete files are added to a
>>>>>>>> data file while a Merge is going on, then the Merge would fail. That causes
>>>>>>>> huge performance issues for CDC streaming use cases and Merge is very hard
>>>>>>>> to succeed. There are 2 proposed solutions:
>>>>>>>>   (1) for hot partitions, users can try to only perform Convert and
>>>>>>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>>>>>>> becomes cold and a Merge can be performed safely.
>>>>>>>>   (2) it looks like we need a Merge strategy that does not do any
>>>>>>>> bin-packing, and only merges the delete files for each data file and writes
>>>>>>>> it back. The new data file will have the same sequence number as the old
>>>>>>>> file before Merge. By doing so, new delete files can still be applied
>>>>>>>> safely and the compaction can succeed without concerns around conflict. The
>>>>>>>> caveat is that this does not work for position deletes because the row
>>>>>>>> position changes for each file after Merge. But for the CDC streaming use
>>>>>>>> case it is acceptable to only write equality deletes, so this looks like a
>>>>>>>> feasible approach.
>>>>>>>>
>>>>>>>> 7. people raised the concern about the memory consumption issue for
>>>>>>>> the is_deleted metadata column. We ran out of time and will continue the
>>>>>>>> discussion offline on Slack.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jack Ye
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> We are planning to have a meeting to discuss the design of Iceberg
>>>>>>>>> delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>>>
>>>>>>>>> We have also created the channel #compaction on Slack, please join
>>>>>>>>> the channel for daily discussions if you are interested in the progress.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jack Ye
>>>>>>>>>
>>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>
>>>>>>>>>> As there are more and more people adopting the v2 spec, we are
>>>>>>>>>> seeing an increasing number of requests for delete compaction support.
>>>>>>>>>>
>>>>>>>>>> Here is a document discussing the use cases and basic interface
>>>>>>>>>> design for it to get the community aligned around what compactions we would
>>>>>>>>>> offer and how the interfaces would be divided:
>>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>>>
>>>>>>>>>> Any feedback would be appreciated!
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jack Ye
>>>>>>>>>>
>>>>>>>>>

Re: Iceberg Delete Compaction Interface Design

Posted by Jack Ye <ye...@gmail.com>.
> I think even with the custom sequence file numbers on output data files;
the position delete files have to be deleted; *since position deletes also
apply on data files with the same sequence number*. Also, unless I am
missing something, I think the equality delete files cannot be deleted at
the end of this compaction, as it is really hard to figure out if all the
impacted data files have been rewritten:

The plan is to always remove both position and equality deletes. Given a
predicate (e.g. COMPACT table WHERE region='US'), the initial
implementation will always compact full partitions by (1) look for all
delete files based on the predicate, (2) get all impacted partitions, (3)
rewrite all data files in those partitions that has deletes, (4) remove
those delete files. The algorithm can be improved to a smaller subset of
files, but currently we mostly rely on Iceberg's scan planning and as you
said it's hard to figure out if a delete file (especially equality delete)
covers any additional data files. But we know each delete file only belongs
to a single partition, which guarantees the removal is safe. (For
partitioned tables, global deletes will be handled separately and not
removed unless specifically requested by the user because it requires a
full table compact, but CDC does not write global deletes anyway)

-Jack



On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pz...@netflix.com.invalid>
wrote:

> Thanks for the clarifications; and thanks for pulling together the
> documentation for the row-level delete functionality separately; as that
> will be very helpful.
> I think we are in agreement on most points. I just want to reiterate my
> understanding of the merge compaction behavior to make sure we are on the
> same page.
>
> The output table of a Flink CDC pipeline will in a lot of cases have small
> files with unsorted data; so doing the bin-pack and sorting compactions is
> also important for those tables (and obviously being able to do so without
> conflicts with incoming data is also important). If the existing bin-pack
> and sort compaction actions are enhanced with
>
> 1) writing the output data files with the sequence number of the snapshot
> with which the compaction was started with *AND *
> 2) deleting all position delete files whose data files have been
> rewritten;
>
> then I believe we can run those (bin-pack and sort) compactions as well
> without fear of conflicting with newer CDC updates. I think even with the
> custom sequence file numbers on output data files; the position delete
> files have to be deleted; *since position deletes also apply on data
> files with the same sequence number*. Also, unless I am missing
> something, I think the equality delete files cannot be deleted at the end
> of this compaction, as it is really hard to figure out if all the impacted
> data files have been rewritten.
>
> If the bin-pack and sort compactions are enhanced in this manner; then I
> foresee just running those so that the same compaction can take care of all
> relevant optimizations for a table (including delete file removal). At the
> end, potentially, only some unnecessary equality delete files will be
> remaining which will have to be deleted by some other action.
>
> Thanks,
> - Puneet
>
>
>
>
> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <ye...@gmail.com> wrote:
>
>> > why can't this strategy do bin-packing or sorting as well; if that is
>> required; as long as the sequence number is not updated.
>> > wouldn't subsequent reads re-apply the delete files which were used in
>> the merge as well?
>>
>> I think you are right, we can use the sequence number of the snapshot we
>> start compaction as the new sequence number of all rewritten files because
>> all the deletes of older sequence numbers related to the file have been
>> applied. (correct me if I missed anything here)
>> But from the correctness perspective it's the same because we remove
>> those delete files as a part of the compaction so we will not really
>> reapply those deletes.
>> With this improvement, we can now also do bin-packing easily. (If we
>> preserve all sequence numbers, we can still do bin-packing for files
>> sharing the same sequence number, what I described was just the easiest way
>> to ensure sequence number preservation)
>>
>> > for the tables only being written into by a Flink CDC pipeline, this
>> should not happen as position deletes are only created for in-progress
>> (uncommitted) data files, correct ?
>>
>> Correct
>>
>> > I believe for the CDC use case it is hard to guarantee that  that
>> partitions will turn cold and can be merged without conflicts, as 'hotness'
>> is a factor of mutation rate in the source DB
>>
>> I agree. When I say hot/cold I am mostly referring to those
>> time-partitioned use cases with clear hot-cold division of data. But in the
>> end the principle is that the compaction strategy should be determined by
>> the characteristics of the partition. If all the partitions are always with
>> high traffic, then I think merge with preserved sequence number seems like
>> the way to go for the entire table.
>>
>> I have recently summarized all the related concepts in
>> https://github.com/apache/iceberg/pull/3432, it would be great if you
>> can take a look about anything else I missed, thanks!
>>
>> Best,
>> Jack Ye
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo <pz...@netflix.com.invalid>
>> wrote:
>>
>>> Another follow-up regarding this : *"Merge strategy that does not do
>>> any bin-packing, and only merges the delete files for each data file and
>>> writes it back. The new data file will have the same sequence number as the
>>> old file before Merge"* ; shouldn't the sequence number be set to the
>>> highest sequence number of any applied delete file to the data file. If the
>>> sequence number of the data file is not changed at-all, wouldn't subsequent
>>> reads re-apply the delete files which were used in the merge as well?
>>>
>>> thanks
>>>
>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pz...@netflix.com> wrote:
>>>
>>>> I had a few follow-up points.
>>>>
>>>> 1 *"(1) for hot partitions, users can try to only perform Convert and
>>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>>> becomes cold and a Merge can be performed safely.".* : I believe for
>>>> the CDC use case it is hard to guarantee that  that partitions will turn
>>>> cold and can be merged without conflicts, as 'hotness' is a factor of
>>>> mutation rate in the source DB; and perhaps some partitions are always
>>>> "hot"; so in essence the following:  *"Merge strategy that does not do
>>>> any bin-packing, and only merges the delete files for each data file and
>>>> writes it back. The new data file will have the same sequence number as the
>>>> old file before Merge"* seems important. Though as a follow-up I am
>>>> wondering why can't this strategy do bin-packing or sorting as well; if
>>>> that is required; as long as the sequence number is not updated.
>>>>
>>>> 2 *"During the commit validation phase of a Merge operation, we need
>>>> to verify that for each data file that would be removed, there are no new
>>>> position deletes with higher sequence number added."* : Just to be
>>>> clear; for the tables only being written into by a Flink CDC pipeline, this
>>>> should not happen as position deletes are only created for in-progress
>>>> (uncommitted) data files, correct ?
>>>>
>>>> Thanks and regards,
>>>> - Puneet
>>>>
>>>>
>>>>
>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com> wrote:
>>>>
>>>>> Had some offline discussions on Slack and WeChat.
>>>>>
>>>>> For Russell's point, we are reconfirming with related people on Slack,
>>>>> and will post updates once we have an agreement.
>>>>>
>>>>> Regarding point 6, for Flink CDC the data file flushed to disk might
>>>>> be associated with position deletes, but after the flush all deletes will
>>>>> be equality deletes, so 6-2 still works. After all, as long as data files
>>>>> for position deletes are not removed, the process should be able to succeed
>>>>> with optimistic retry. Currently we are missing the following that needs to
>>>>> be worked on to resolve the CDC performance issue:
>>>>> 1. We need to support setting the sequence number for individual
>>>>> content files.
>>>>> 2. During the commit validation phase of a Merge operation, we need to
>>>>> verify that for each data file that would be removed, there are no new
>>>>> position deletes with higher sequence number added. If detected, merge of
>>>>> that file has to be completely retried (we can support incremental progress
>>>>> for this).
>>>>>
>>>>> -Jack
>>>>>
>>>>>
>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>>> russell.spitzer@gmail.com> wrote:
>>>>>
>>>>>> I think I understood the Rewrite strategy discussion a little
>>>>>> differently
>>>>>>
>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets you
>>>>>> pick files based on their number of delete files. So basically you can set
>>>>>> a variety of parameters, small files, large files, files with deletes etc
>>>>>> ...
>>>>>>
>>>>>> A new strategy is added which determines which file to rewrite by
>>>>>> looking for all files currently touched by delete files. Instead of looking
>>>>>> through files with X deletes, we look up all files affected by deletes and
>>>>>> rewrite them. Although now as I write this it's basically running the above
>>>>>> strategies with number of delete files >= 1 and files per group at 1. So
>>>>>> maybe it doesn't need another strategy?
>>>>>>
>>>>>> But maybe I got that wrong ...
>>>>>>
>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks to everyone who came to the meeting.
>>>>>>>
>>>>>>> Here is the full meeting recording I made:
>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>>
>>>>>>> Here are some key takeaways:
>>>>>>>
>>>>>>> 1. we generally agreed upon the division of compactions into
>>>>>>> Rewrite, Convert and Merge.
>>>>>>>
>>>>>>> 2. Merge will be implemented through RewriteDataFiles as proposed in
>>>>>>> https://github.com/apache/iceberg/pull/3207, but instead as a new
>>>>>>> strategy by extending the existing BinPackStrategy. For users who would
>>>>>>> also like to run sort during Merge, we will have another delete strategy
>>>>>>> that extends the SortStrategy.
>>>>>>>
>>>>>>> 3. Merge can have an option that allows users to set the minimum
>>>>>>> numbers of delete files to trigger a compaction. However, that would result
>>>>>>> in very frequent compaction of full partition if people add many global
>>>>>>> delete files. A Convert of global equality deletes to partition
>>>>>>> position deletes while maintaining the same sequence number can be used to
>>>>>>> solve the issue. Currently there is no way to write files with a custom
>>>>>>> sequence number. This functionality needs to be added.
>>>>>>>
>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>>
>>>>>>> 5. we had some discussion around the separation of row and partition
>>>>>>> level filters. The general direction in the meeting is to just have a
>>>>>>> single filter method. We will sync offline to reach an agreement.
>>>>>>>
>>>>>>> 6. people raised the issue that if new delete files are added to a
>>>>>>> data file while a Merge is going on, then the Merge would fail. That causes
>>>>>>> huge performance issues for CDC streaming use cases and Merge is very hard
>>>>>>> to succeed. There are 2 proposed solutions:
>>>>>>>   (1) for hot partitions, users can try to only perform Convert and
>>>>>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>>>>>> becomes cold and a Merge can be performed safely.
>>>>>>>   (2) it looks like we need a Merge strategy that does not do any
>>>>>>> bin-packing, and only merges the delete files for each data file and writes
>>>>>>> it back. The new data file will have the same sequence number as the old
>>>>>>> file before Merge. By doing so, new delete files can still be applied
>>>>>>> safely and the compaction can succeed without concerns around conflict. The
>>>>>>> caveat is that this does not work for position deletes because the row
>>>>>>> position changes for each file after Merge. But for the CDC streaming use
>>>>>>> case it is acceptable to only write equality deletes, so this looks like a
>>>>>>> feasible approach.
>>>>>>>
>>>>>>> 7. people raised the concern about the memory consumption issue for
>>>>>>> the is_deleted metadata column. We ran out of time and will continue the
>>>>>>> discussion offline on Slack.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jack Ye
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> We are planning to have a meeting to discuss the design of Iceberg
>>>>>>>> delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>>
>>>>>>>> We have also created the channel #compaction on Slack, please join
>>>>>>>> the channel for daily discussions if you are interested in the progress.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jack Ye
>>>>>>>>
>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> As there are more and more people adopting the v2 spec, we are
>>>>>>>>> seeing an increasing number of requests for delete compaction support.
>>>>>>>>>
>>>>>>>>> Here is a document discussing the use cases and basic interface
>>>>>>>>> design for it to get the community aligned around what compactions we would
>>>>>>>>> offer and how the interfaces would be divided:
>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>>
>>>>>>>>> Any feedback would be appreciated!
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jack Ye
>>>>>>>>>
>>>>>>>>

Re: Iceberg Delete Compaction Interface Design

Posted by Puneet Zaroo <pz...@netflix.com.INVALID>.
Thanks for the clarifications; and thanks for pulling together the
documentation for the row-level delete functionality separately; as that
will be very helpful.
I think we are in agreement on most points. I just want to reiterate my
understanding of the merge compaction behavior to make sure we are on the
same page.

The output table of a Flink CDC pipeline will in a lot of cases have small
files with unsorted data; so doing the bin-pack and sorting compactions is
also important for those tables (and obviously being able to do so without
conflicts with incoming data is also important). If the existing bin-pack
and sort compaction actions are enhanced with

1) writing the output data files with the sequence number of the snapshot
with which the compaction was started with *AND *
2) deleting all position delete files whose data files have been rewritten;

then I believe we can run those (bin-pack and sort) compactions as well
without fear of conflicting with newer CDC updates. I think even with the
custom sequence file numbers on output data files; the position delete
files have to be deleted; *since position deletes also apply on data files
with the same sequence number*. Also, unless I am missing something, I
think the equality delete files cannot be deleted at the end of this
compaction, as it is really hard to figure out if all the impacted data
files have been rewritten.

If the bin-pack and sort compactions are enhanced in this manner; then I
foresee just running those so that the same compaction can take care of all
relevant optimizations for a table (including delete file removal). At the
end, potentially, only some unnecessary equality delete files will be
remaining which will have to be deleted by some other action.

Thanks,
- Puneet




On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <ye...@gmail.com> wrote:

> > why can't this strategy do bin-packing or sorting as well; if that is
> required; as long as the sequence number is not updated.
> > wouldn't subsequent reads re-apply the delete files which were used in
> the merge as well?
>
> I think you are right, we can use the sequence number of the snapshot we
> start compaction as the new sequence number of all rewritten files because
> all the deletes of older sequence numbers related to the file have been
> applied. (correct me if I missed anything here)
> But from the correctness perspective it's the same because we remove those
> delete files as a part of the compaction so we will not really reapply
> those deletes.
> With this improvement, we can now also do bin-packing easily. (If we
> preserve all sequence numbers, we can still do bin-packing for files
> sharing the same sequence number, what I described was just the easiest way
> to ensure sequence number preservation)
>
> > for the tables only being written into by a Flink CDC pipeline, this
> should not happen as position deletes are only created for in-progress
> (uncommitted) data files, correct ?
>
> Correct
>
> > I believe for the CDC use case it is hard to guarantee that  that
> partitions will turn cold and can be merged without conflicts, as 'hotness'
> is a factor of mutation rate in the source DB
>
> I agree. When I say hot/cold I am mostly referring to those
> time-partitioned use cases with clear hot-cold division of data. But in the
> end the principle is that the compaction strategy should be determined by
> the characteristics of the partition. If all the partitions are always with
> high traffic, then I think merge with preserved sequence number seems like
> the way to go for the entire table.
>
> I have recently summarized all the related concepts in
> https://github.com/apache/iceberg/pull/3432, it would be great if you can
> take a look about anything else I missed, thanks!
>
> Best,
> Jack Ye
>
>
>
>
>
>
>
> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo <pz...@netflix.com.invalid>
> wrote:
>
>> Another follow-up regarding this : *"Merge strategy that does not do any
>> bin-packing, and only merges the delete files for each data file and writes
>> it back. The new data file will have the same sequence number as the old
>> file before Merge"* ; shouldn't the sequence number be set to the
>> highest sequence number of any applied delete file to the data file. If the
>> sequence number of the data file is not changed at-all, wouldn't subsequent
>> reads re-apply the delete files which were used in the merge as well?
>>
>> thanks
>>
>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pz...@netflix.com> wrote:
>>
>>> I had a few follow-up points.
>>>
>>> 1 *"(1) for hot partitions, users can try to only perform Convert and
>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>> becomes cold and a Merge can be performed safely.".* : I believe for
>>> the CDC use case it is hard to guarantee that  that partitions will turn
>>> cold and can be merged without conflicts, as 'hotness' is a factor of
>>> mutation rate in the source DB; and perhaps some partitions are always
>>> "hot"; so in essence the following:  *"Merge strategy that does not do
>>> any bin-packing, and only merges the delete files for each data file and
>>> writes it back. The new data file will have the same sequence number as the
>>> old file before Merge"* seems important. Though as a follow-up I am
>>> wondering why can't this strategy do bin-packing or sorting as well; if
>>> that is required; as long as the sequence number is not updated.
>>>
>>> 2 *"During the commit validation phase of a Merge operation, we need to
>>> verify that for each data file that would be removed, there are no new
>>> position deletes with higher sequence number added."* : Just to be
>>> clear; for the tables only being written into by a Flink CDC pipeline, this
>>> should not happen as position deletes are only created for in-progress
>>> (uncommitted) data files, correct ?
>>>
>>> Thanks and regards,
>>> - Puneet
>>>
>>>
>>>
>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com> wrote:
>>>
>>>> Had some offline discussions on Slack and WeChat.
>>>>
>>>> For Russell's point, we are reconfirming with related people on Slack,
>>>> and will post updates once we have an agreement.
>>>>
>>>> Regarding point 6, for Flink CDC the data file flushed to disk might be
>>>> associated with position deletes, but after the flush all deletes will be
>>>> equality deletes, so 6-2 still works. After all, as long as data files for
>>>> position deletes are not removed, the process should be able to succeed
>>>> with optimistic retry. Currently we are missing the following that needs to
>>>> be worked on to resolve the CDC performance issue:
>>>> 1. We need to support setting the sequence number for individual
>>>> content files.
>>>> 2. During the commit validation phase of a Merge operation, we need to
>>>> verify that for each data file that would be removed, there are no new
>>>> position deletes with higher sequence number added. If detected, merge of
>>>> that file has to be completely retried (we can support incremental progress
>>>> for this).
>>>>
>>>> -Jack
>>>>
>>>>
>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>> russell.spitzer@gmail.com> wrote:
>>>>
>>>>> I think I understood the Rewrite strategy discussion a little
>>>>> differently
>>>>>
>>>>> Binpack Strategy and SortStrategy each get a new flag which lets you
>>>>> pick files based on their number of delete files. So basically you can set
>>>>> a variety of parameters, small files, large files, files with deletes etc
>>>>> ...
>>>>>
>>>>> A new strategy is added which determines which file to rewrite by
>>>>> looking for all files currently touched by delete files. Instead of looking
>>>>> through files with X deletes, we look up all files affected by deletes and
>>>>> rewrite them. Although now as I write this it's basically running the above
>>>>> strategies with number of delete files >= 1 and files per group at 1. So
>>>>> maybe it doesn't need another strategy?
>>>>>
>>>>> But maybe I got that wrong ...
>>>>>
>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>
>>>>>> Thanks to everyone who came to the meeting.
>>>>>>
>>>>>> Here is the full meeting recording I made:
>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>
>>>>>> Here are some key takeaways:
>>>>>>
>>>>>> 1. we generally agreed upon the division of compactions into Rewrite,
>>>>>> Convert and Merge.
>>>>>>
>>>>>> 2. Merge will be implemented through RewriteDataFiles as proposed in
>>>>>> https://github.com/apache/iceberg/pull/3207, but instead as a new
>>>>>> strategy by extending the existing BinPackStrategy. For users who would
>>>>>> also like to run sort during Merge, we will have another delete strategy
>>>>>> that extends the SortStrategy.
>>>>>>
>>>>>> 3. Merge can have an option that allows users to set the minimum
>>>>>> numbers of delete files to trigger a compaction. However, that would result
>>>>>> in very frequent compaction of full partition if people add many global
>>>>>> delete files. A Convert of global equality deletes to partition
>>>>>> position deletes while maintaining the same sequence number can be used to
>>>>>> solve the issue. Currently there is no way to write files with a custom
>>>>>> sequence number. This functionality needs to be added.
>>>>>>
>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>
>>>>>> 5. we had some discussion around the separation of row and partition
>>>>>> level filters. The general direction in the meeting is to just have a
>>>>>> single filter method. We will sync offline to reach an agreement.
>>>>>>
>>>>>> 6. people raised the issue that if new delete files are added to a
>>>>>> data file while a Merge is going on, then the Merge would fail. That causes
>>>>>> huge performance issues for CDC streaming use cases and Merge is very hard
>>>>>> to succeed. There are 2 proposed solutions:
>>>>>>   (1) for hot partitions, users can try to only perform Convert and
>>>>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>>>>> becomes cold and a Merge can be performed safely.
>>>>>>   (2) it looks like we need a Merge strategy that does not do any
>>>>>> bin-packing, and only merges the delete files for each data file and writes
>>>>>> it back. The new data file will have the same sequence number as the old
>>>>>> file before Merge. By doing so, new delete files can still be applied
>>>>>> safely and the compaction can succeed without concerns around conflict. The
>>>>>> caveat is that this does not work for position deletes because the row
>>>>>> position changes for each file after Merge. But for the CDC streaming use
>>>>>> case it is acceptable to only write equality deletes, so this looks like a
>>>>>> feasible approach.
>>>>>>
>>>>>> 7. people raised the concern about the memory consumption issue for
>>>>>> the is_deleted metadata column. We ran out of time and will continue the
>>>>>> discussion offline on Slack.
>>>>>>
>>>>>> Best,
>>>>>> Jack Ye
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> We are planning to have a meeting to discuss the design of Iceberg
>>>>>>> delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>
>>>>>>> We have also created the channel #compaction on Slack, please join
>>>>>>> the channel for daily discussions if you are interested in the progress.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jack Ye
>>>>>>>
>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> As there are more and more people adopting the v2 spec, we are
>>>>>>>> seeing an increasing number of requests for delete compaction support.
>>>>>>>>
>>>>>>>> Here is a document discussing the use cases and basic interface
>>>>>>>> design for it to get the community aligned around what compactions we would
>>>>>>>> offer and how the interfaces would be divided:
>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>
>>>>>>>> Any feedback would be appreciated!
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jack Ye
>>>>>>>>
>>>>>>>

Re: Iceberg Delete Compaction Interface Design

Posted by Jack Ye <ye...@gmail.com>.
> why can't this strategy do bin-packing or sorting as well; if that is
required; as long as the sequence number is not updated.
> wouldn't subsequent reads re-apply the delete files which were used in
the merge as well?

I think you are right, we can use the sequence number of the snapshot we
start compaction as the new sequence number of all rewritten files because
all the deletes of older sequence numbers related to the file have been
applied. (correct me if I missed anything here)
But from the correctness perspective it's the same because we remove those
delete files as a part of the compaction so we will not really reapply
those deletes.
With this improvement, we can now also do bin-packing easily. (If we
preserve all sequence numbers, we can still do bin-packing for files
sharing the same sequence number, what I described was just the easiest way
to ensure sequence number preservation)

> for the tables only being written into by a Flink CDC pipeline, this
should not happen as position deletes are only created for in-progress
(uncommitted) data files, correct ?

Correct

> I believe for the CDC use case it is hard to guarantee that  that
partitions will turn cold and can be merged without conflicts, as 'hotness'
is a factor of mutation rate in the source DB

I agree. When I say hot/cold I am mostly referring to those
time-partitioned use cases with clear hot-cold division of data. But in the
end the principle is that the compaction strategy should be determined by
the characteristics of the partition. If all the partitions are always with
high traffic, then I think merge with preserved sequence number seems like
the way to go for the entire table.

I have recently summarized all the related concepts in
https://github.com/apache/iceberg/pull/3432, it would be great if you can
take a look about anything else I missed, thanks!

Best,
Jack Ye







On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo <pz...@netflix.com.invalid>
wrote:

> Another follow-up regarding this : *"Merge strategy that does not do any
> bin-packing, and only merges the delete files for each data file and writes
> it back. The new data file will have the same sequence number as the old
> file before Merge"* ; shouldn't the sequence number be set to the highest
> sequence number of any applied delete file to the data file. If the
> sequence number of the data file is not changed at-all, wouldn't subsequent
> reads re-apply the delete files which were used in the merge as well?
>
> thanks
>
> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pz...@netflix.com> wrote:
>
>> I had a few follow-up points.
>>
>> 1 *"(1) for hot partitions, users can try to only perform Convert and
>> Rewrite to keep delete file sizes and count manageable, until the partition
>> becomes cold and a Merge can be performed safely.".* : I believe for the
>> CDC use case it is hard to guarantee that  that partitions will turn cold
>> and can be merged without conflicts, as 'hotness' is a factor of mutation
>> rate in the source DB; and perhaps some partitions are always "hot"; so in
>> essence the following:  *"Merge strategy that does not do any
>> bin-packing, and only merges the delete files for each data file and writes
>> it back. The new data file will have the same sequence number as the old
>> file before Merge"* seems important. Though as a follow-up I am
>> wondering why can't this strategy do bin-packing or sorting as well; if
>> that is required; as long as the sequence number is not updated.
>>
>> 2 *"During the commit validation phase of a Merge operation, we need to
>> verify that for each data file that would be removed, there are no new
>> position deletes with higher sequence number added."* : Just to be
>> clear; for the tables only being written into by a Flink CDC pipeline, this
>> should not happen as position deletes are only created for in-progress
>> (uncommitted) data files, correct ?
>>
>> Thanks and regards,
>> - Puneet
>>
>>
>>
>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com> wrote:
>>
>>> Had some offline discussions on Slack and WeChat.
>>>
>>> For Russell's point, we are reconfirming with related people on Slack,
>>> and will post updates once we have an agreement.
>>>
>>> Regarding point 6, for Flink CDC the data file flushed to disk might be
>>> associated with position deletes, but after the flush all deletes will be
>>> equality deletes, so 6-2 still works. After all, as long as data files for
>>> position deletes are not removed, the process should be able to succeed
>>> with optimistic retry. Currently we are missing the following that needs to
>>> be worked on to resolve the CDC performance issue:
>>> 1. We need to support setting the sequence number for individual content
>>> files.
>>> 2. During the commit validation phase of a Merge operation, we need to
>>> verify that for each data file that would be removed, there are no new
>>> position deletes with higher sequence number added. If detected, merge of
>>> that file has to be completely retried (we can support incremental progress
>>> for this).
>>>
>>> -Jack
>>>
>>>
>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>> russell.spitzer@gmail.com> wrote:
>>>
>>>> I think I understood the Rewrite strategy discussion a little
>>>> differently
>>>>
>>>> Binpack Strategy and SortStrategy each get a new flag which lets you
>>>> pick files based on their number of delete files. So basically you can set
>>>> a variety of parameters, small files, large files, files with deletes etc
>>>> ...
>>>>
>>>> A new strategy is added which determines which file to rewrite by
>>>> looking for all files currently touched by delete files. Instead of looking
>>>> through files with X deletes, we look up all files affected by deletes and
>>>> rewrite them. Although now as I write this it's basically running the above
>>>> strategies with number of delete files >= 1 and files per group at 1. So
>>>> maybe it doesn't need another strategy?
>>>>
>>>> But maybe I got that wrong ...
>>>>
>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com> wrote:
>>>>
>>>>> Thanks to everyone who came to the meeting.
>>>>>
>>>>> Here is the full meeting recording I made:
>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>
>>>>> Here are some key takeaways:
>>>>>
>>>>> 1. we generally agreed upon the division of compactions into Rewrite,
>>>>> Convert and Merge.
>>>>>
>>>>> 2. Merge will be implemented through RewriteDataFiles as proposed in
>>>>> https://github.com/apache/iceberg/pull/3207, but instead as a new
>>>>> strategy by extending the existing BinPackStrategy. For users who would
>>>>> also like to run sort during Merge, we will have another delete strategy
>>>>> that extends the SortStrategy.
>>>>>
>>>>> 3. Merge can have an option that allows users to set the minimum
>>>>> numbers of delete files to trigger a compaction. However, that would result
>>>>> in very frequent compaction of full partition if people add many global
>>>>> delete files. A Convert of global equality deletes to partition
>>>>> position deletes while maintaining the same sequence number can be used to
>>>>> solve the issue. Currently there is no way to write files with a custom
>>>>> sequence number. This functionality needs to be added.
>>>>>
>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>
>>>>> 5. we had some discussion around the separation of row and partition
>>>>> level filters. The general direction in the meeting is to just have a
>>>>> single filter method. We will sync offline to reach an agreement.
>>>>>
>>>>> 6. people raised the issue that if new delete files are added to a
>>>>> data file while a Merge is going on, then the Merge would fail. That causes
>>>>> huge performance issues for CDC streaming use cases and Merge is very hard
>>>>> to succeed. There are 2 proposed solutions:
>>>>>   (1) for hot partitions, users can try to only perform Convert and
>>>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>>>> becomes cold and a Merge can be performed safely.
>>>>>   (2) it looks like we need a Merge strategy that does not do any
>>>>> bin-packing, and only merges the delete files for each data file and writes
>>>>> it back. The new data file will have the same sequence number as the old
>>>>> file before Merge. By doing so, new delete files can still be applied
>>>>> safely and the compaction can succeed without concerns around conflict. The
>>>>> caveat is that this does not work for position deletes because the row
>>>>> position changes for each file after Merge. But for the CDC streaming use
>>>>> case it is acceptable to only write equality deletes, so this looks like a
>>>>> feasible approach.
>>>>>
>>>>> 7. people raised the concern about the memory consumption issue for
>>>>> the is_deleted metadata column. We ran out of time and will continue the
>>>>> discussion offline on Slack.
>>>>>
>>>>> Best,
>>>>> Jack Ye
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> We are planning to have a meeting to discuss the design of Iceberg
>>>>>> delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>
>>>>>> We have also created the channel #compaction on Slack, please join
>>>>>> the channel for daily discussions if you are interested in the progress.
>>>>>>
>>>>>> Best,
>>>>>> Jack Ye
>>>>>>
>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> As there are more and more people adopting the v2 spec, we are
>>>>>>> seeing an increasing number of requests for delete compaction support.
>>>>>>>
>>>>>>> Here is a document discussing the use cases and basic interface
>>>>>>> design for it to get the community aligned around what compactions we would
>>>>>>> offer and how the interfaces would be divided:
>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>
>>>>>>> Any feedback would be appreciated!
>>>>>>>
>>>>>>> Best,
>>>>>>> Jack Ye
>>>>>>>
>>>>>>

Re: Iceberg Delete Compaction Interface Design

Posted by Puneet Zaroo <pz...@netflix.com.INVALID>.
Another follow-up regarding this : *"Merge strategy that does not do any
bin-packing, and only merges the delete files for each data file and writes
it back. The new data file will have the same sequence number as the old
file before Merge"* ; shouldn't the sequence number be set to the highest
sequence number of any applied delete file to the data file. If the
sequence number of the data file is not changed at-all, wouldn't subsequent
reads re-apply the delete files which were used in the merge as well?

thanks

On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pz...@netflix.com> wrote:

> I had a few follow-up points.
>
> 1 *"(1) for hot partitions, users can try to only perform Convert and
> Rewrite to keep delete file sizes and count manageable, until the partition
> becomes cold and a Merge can be performed safely.".* : I believe for the
> CDC use case it is hard to guarantee that  that partitions will turn cold
> and can be merged without conflicts, as 'hotness' is a factor of mutation
> rate in the source DB; and perhaps some partitions are always "hot"; so in
> essence the following:  *"Merge strategy that does not do any
> bin-packing, and only merges the delete files for each data file and writes
> it back. The new data file will have the same sequence number as the old
> file before Merge"* seems important. Though as a follow-up I am wondering
> why can't this strategy do bin-packing or sorting as well; if that is
> required; as long as the sequence number is not updated.
>
> 2 *"During the commit validation phase of a Merge operation, we need to
> verify that for each data file that would be removed, there are no new
> position deletes with higher sequence number added."* : Just to be clear;
> for the tables only being written into by a Flink CDC pipeline, this should
> not happen as position deletes are only created for in-progress
> (uncommitted) data files, correct ?
>
> Thanks and regards,
> - Puneet
>
>
>
> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <ye...@gmail.com> wrote:
>
>> Had some offline discussions on Slack and WeChat.
>>
>> For Russell's point, we are reconfirming with related people on Slack,
>> and will post updates once we have an agreement.
>>
>> Regarding point 6, for Flink CDC the data file flushed to disk might be
>> associated with position deletes, but after the flush all deletes will be
>> equality deletes, so 6-2 still works. After all, as long as data files for
>> position deletes are not removed, the process should be able to succeed
>> with optimistic retry. Currently we are missing the following that needs to
>> be worked on to resolve the CDC performance issue:
>> 1. We need to support setting the sequence number for individual content
>> files.
>> 2. During the commit validation phase of a Merge operation, we need to
>> verify that for each data file that would be removed, there are no new
>> position deletes with higher sequence number added. If detected, merge of
>> that file has to be completely retried (we can support incremental progress
>> for this).
>>
>> -Jack
>>
>>
>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>> russell.spitzer@gmail.com> wrote:
>>
>>> I think I understood the Rewrite strategy discussion a little differently
>>>
>>> Binpack Strategy and SortStrategy each get a new flag which lets you
>>> pick files based on their number of delete files. So basically you can set
>>> a variety of parameters, small files, large files, files with deletes etc
>>> ...
>>>
>>> A new strategy is added which determines which file to rewrite by
>>> looking for all files currently touched by delete files. Instead of looking
>>> through files with X deletes, we look up all files affected by deletes and
>>> rewrite them. Although now as I write this it's basically running the above
>>> strategies with number of delete files >= 1 and files per group at 1. So
>>> maybe it doesn't need another strategy?
>>>
>>> But maybe I got that wrong ...
>>>
>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <ye...@gmail.com> wrote:
>>>
>>>> Thanks to everyone who came to the meeting.
>>>>
>>>> Here is the full meeting recording I made:
>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>
>>>> Here are some key takeaways:
>>>>
>>>> 1. we generally agreed upon the division of compactions into Rewrite,
>>>> Convert and Merge.
>>>>
>>>> 2. Merge will be implemented through RewriteDataFiles as proposed in
>>>> https://github.com/apache/iceberg/pull/3207, but instead as a new
>>>> strategy by extending the existing BinPackStrategy. For users who would
>>>> also like to run sort during Merge, we will have another delete strategy
>>>> that extends the SortStrategy.
>>>>
>>>> 3. Merge can have an option that allows users to set the minimum
>>>> numbers of delete files to trigger a compaction. However, that would result
>>>> in very frequent compaction of full partition if people add many global
>>>> delete files. A Convert of global equality deletes to partition
>>>> position deletes while maintaining the same sequence number can be used to
>>>> solve the issue. Currently there is no way to write files with a custom
>>>> sequence number. This functionality needs to be added.
>>>>
>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>> https://github.com/apache/iceberg/pull/2841.
>>>>
>>>> 5. we had some discussion around the separation of row and partition
>>>> level filters. The general direction in the meeting is to just have a
>>>> single filter method. We will sync offline to reach an agreement.
>>>>
>>>> 6. people raised the issue that if new delete files are added to a data
>>>> file while a Merge is going on, then the Merge would fail. That causes huge
>>>> performance issues for CDC streaming use cases and Merge is very hard to
>>>> succeed. There are 2 proposed solutions:
>>>>   (1) for hot partitions, users can try to only perform Convert and
>>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>>> becomes cold and a Merge can be performed safely.
>>>>   (2) it looks like we need a Merge strategy that does not do any
>>>> bin-packing, and only merges the delete files for each data file and writes
>>>> it back. The new data file will have the same sequence number as the old
>>>> file before Merge. By doing so, new delete files can still be applied
>>>> safely and the compaction can succeed without concerns around conflict. The
>>>> caveat is that this does not work for position deletes because the row
>>>> position changes for each file after Merge. But for the CDC streaming use
>>>> case it is acceptable to only write equality deletes, so this looks like a
>>>> feasible approach.
>>>>
>>>> 7. people raised the concern about the memory consumption issue for the
>>>> is_deleted metadata column. We ran out of time and will continue the
>>>> discussion offline on Slack.
>>>>
>>>> Best,
>>>> Jack Ye
>>>>
>>>>
>>>>
>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <ye...@gmail.com> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> We are planning to have a meeting to discuss the design of Iceberg
>>>>> delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>
>>>>> We have also created the channel #compaction on Slack, please join the
>>>>> channel for daily discussions if you are interested in the progress.
>>>>>
>>>>> Best,
>>>>> Jack Ye
>>>>>
>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <ye...@gmail.com> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> As there are more and more people adopting the v2 spec, we are seeing
>>>>>> an increasing number of requests for delete compaction support.
>>>>>>
>>>>>> Here is a document discussing the use cases and basic interface
>>>>>> design for it to get the community aligned around what compactions we would
>>>>>> offer and how the interfaces would be divided:
>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>
>>>>>> Any feedback would be appreciated!
>>>>>>
>>>>>> Best,
>>>>>> Jack Ye
>>>>>>
>>>>>