You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Reo Lei <le...@gmail.com> on 2022/02/09 15:04:53 UTC

[DISCUSS] Support streaming read Iceberg V2 table

Hi everyone,

As v2 tables become more and more popular, more and more users want to use
flink and iceberg to build quasi-real-time data warehouses.
But currently iceberg doesn't support incremental reading of v2 tables via
flink, so I drafted a design document
<https://docs.google.com/document/d/1zEpNYcA5Tf5ysdoj3jO425A1QRI-3OMb_Fy8EG_9DD4/edit?usp=sharing>
to support this. The document mainly discusses the type of data stream that
needs to be returned for incrementally reading v2 tables and how to save
and read the changelog.

Please have a look and any feedback would be appreciated!

Best Regards,
Reo Lei

Re: [DISCUSS] Support streaming read Iceberg V2 table

Posted by Reo Lei <le...@gmail.com>.
Thanks to Walaa and Yufei for your replies!

I admit that this solution based on write-time logging will have the
problems mentioned by Yufei and Walaa.

I also tried to restore the change record based on the data file and delete
file during the MOR process, but I encountered the following problems:
1. At present, iceberg's process of reading data is oriented to data files,
but DELETE events are only stored in delete files. If we want to restore
this event, I think we need to get the set of primary keys of all data
files and eq-delete first. Then get the primary key contained only by
eq-delete and restore the entire DELETE event. This will require one more
read of the eq-delete data, efficiency may be a problem, and it cannot
restore the original order of DELETE events.
2. In the process of MOR, we can only know whether a record has been
deleted at present, but we cannot distinguish whether a record is deleted
by pos-delete or deleted by eq-delete. I think this will make it impossible
to determine whether a -U event or a -D event needs to be generated when a
record is deleted.
3. Since data reading is concurrently read at file granularity, this will
cause the order of change events to be disrupted. We need other additional
mechanisms to ensure that the order of events will not be broken during
reading.

If the above problems can be solved, I think it is a good choice to restore
the change record through the data file and delete file. And I'd like to
integrate this functionality into Flink as well.

By the way, I found that many of developers are trying to implement the
function of incrementally reading the changelog, and many users are very
concerned about this function. Maybe we should create a project on github
and open a channel on slack to discuss the implementation of this feature
(I didn't find out that you were working on this feature until you replied
to my email).

Best,
Reo

Yufei Gu <fl...@gmail.com> 于2022年2月10日周四 03:29写道:

> Hi Reo,
>
> Agree with Walaa, the major concern is that the proposal needs table spec
> change and write-time logging.
> We try to avoid table spec changes, so that the feature can work on
> existing table formats.
> 1. Users don't have to wait for the new table spec, which may take a while.
> 2. Users don't have to upgrade their tables, which is usually costy.
>
> We also try to avoid write-time logging. Iceberg table doesn't depend on
> any specific engine. The write-time logging means all
> clients(Spark/Flink/Trino/Customized Client) have to follow the format of
> logging.
> 1. It's a non-trivial effort to make changes for all of them.
> 2. The write from a randomized client won't break the CDC records
> generation.
>
> In today's community meeting, we discussed the solution we are working on
> to get CDC records without table spec change and write-time logging. Will
> post the design doc soon.
> Here is another issue thread:
> https://github.com/apache/iceberg/issues/3941
>
> Best,
>
> Yufei
>
> `This is not a contribution`
>
>
> On Wed, Feb 9, 2022 at 10:43 AM Walaa Eldin Moustafa <
> wa.moustafa@gmail.com> wrote:
>
>> Hi Reo,
>>
>> I am not sure if I am reading the proposal correctly or not, but does the
>> proposal suggest changing the data file format/schema to support the
>> operation type? I think one of the Iceberg principles is not to change the
>> data file open formats (Avro, ORC, Parquet, etc) or semantics in an
>> Iceberg-specific way.
>>
>> Also there is a similar discussion here [1], so we may combine the
>> discussions in the same thread.
>>
>> [1] https://lists.apache.org/thread/w3nm6ydc702o1kjr5l3t8d6j01kwjqmz
>>
>> Thanks,
>> Walaa.
>>
>>
>> On Wed, Feb 9, 2022 at 7:05 AM Reo Lei <le...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> As v2 tables become more and more popular, more and more users want to
>>> use flink and iceberg to build quasi-real-time data warehouses.
>>> But currently iceberg doesn't support incremental reading of v2 tables
>>> via flink, so I drafted a design document
>>> <https://docs.google.com/document/d/1zEpNYcA5Tf5ysdoj3jO425A1QRI-3OMb_Fy8EG_9DD4/edit?usp=sharing>
>>> to support this. The document mainly discusses the type of data stream that
>>> needs to be returned for incrementally reading v2 tables and how to save
>>> and read the changelog.
>>>
>>> Please have a look and any feedback would be appreciated!
>>>
>>> Best Regards,
>>> Reo Lei
>>>
>>

Re: [DISCUSS] Support streaming read Iceberg V2 table

Posted by Yufei Gu <fl...@gmail.com>.
Hi Reo,

Agree with Walaa, the major concern is that the proposal needs table spec
change and write-time logging.
We try to avoid table spec changes, so that the feature can work on
existing table formats.
1. Users don't have to wait for the new table spec, which may take a while.
2. Users don't have to upgrade their tables, which is usually costy.

We also try to avoid write-time logging. Iceberg table doesn't depend on
any specific engine. The write-time logging means all
clients(Spark/Flink/Trino/Customized Client) have to follow the format of
logging.
1. It's a non-trivial effort to make changes for all of them.
2. The write from a randomized client won't break the CDC records
generation.

In today's community meeting, we discussed the solution we are working on
to get CDC records without table spec change and write-time logging. Will
post the design doc soon.
Here is another issue thread: https://github.com/apache/iceberg/issues/3941

Best,

Yufei

`This is not a contribution`


On Wed, Feb 9, 2022 at 10:43 AM Walaa Eldin Moustafa <wa...@gmail.com>
wrote:

> Hi Reo,
>
> I am not sure if I am reading the proposal correctly or not, but does the
> proposal suggest changing the data file format/schema to support the
> operation type? I think one of the Iceberg principles is not to change the
> data file open formats (Avro, ORC, Parquet, etc) or semantics in an
> Iceberg-specific way.
>
> Also there is a similar discussion here [1], so we may combine the
> discussions in the same thread.
>
> [1] https://lists.apache.org/thread/w3nm6ydc702o1kjr5l3t8d6j01kwjqmz
>
> Thanks,
> Walaa.
>
>
> On Wed, Feb 9, 2022 at 7:05 AM Reo Lei <le...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> As v2 tables become more and more popular, more and more users want to
>> use flink and iceberg to build quasi-real-time data warehouses.
>> But currently iceberg doesn't support incremental reading of v2 tables
>> via flink, so I drafted a design document
>> <https://docs.google.com/document/d/1zEpNYcA5Tf5ysdoj3jO425A1QRI-3OMb_Fy8EG_9DD4/edit?usp=sharing>
>> to support this. The document mainly discusses the type of data stream that
>> needs to be returned for incrementally reading v2 tables and how to save
>> and read the changelog.
>>
>> Please have a look and any feedback would be appreciated!
>>
>> Best Regards,
>> Reo Lei
>>
>

Re: [DISCUSS] Support streaming read Iceberg V2 table

Posted by Walaa Eldin Moustafa <wa...@gmail.com>.
Hi Reo,

I am not sure if I am reading the proposal correctly or not, but does the
proposal suggest changing the data file format/schema to support the
operation type? I think one of the Iceberg principles is not to change the
data file open formats (Avro, ORC, Parquet, etc) or semantics in an
Iceberg-specific way.

Also there is a similar discussion here [1], so we may combine the
discussions in the same thread.

[1] https://lists.apache.org/thread/w3nm6ydc702o1kjr5l3t8d6j01kwjqmz

Thanks,
Walaa.


On Wed, Feb 9, 2022 at 7:05 AM Reo Lei <le...@gmail.com> wrote:

> Hi everyone,
>
> As v2 tables become more and more popular, more and more users want to use
> flink and iceberg to build quasi-real-time data warehouses.
> But currently iceberg doesn't support incremental reading of v2 tables via
> flink, so I drafted a design document
> <https://docs.google.com/document/d/1zEpNYcA5Tf5ysdoj3jO425A1QRI-3OMb_Fy8EG_9DD4/edit?usp=sharing>
> to support this. The document mainly discusses the type of data stream that
> needs to be returned for incrementally reading v2 tables and how to save
> and read the changelog.
>
> Please have a look and any feedback would be appreciated!
>
> Best Regards,
> Reo Lei
>