You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Taher Koitawala <ta...@gmail.com> on 2022/09/01 05:34:14 UTC

Re: Temporal Iceberg Service

Thank you, Ryan and the iceberg community the suggestions really helped
progress a lot of development. On the same usecase, I hit another
block about doing CDC updates and deletes.

I see two options for managing deletes, for now, EqualityDeletes and
PositionalDeletes:

   1. EqaulityDeletes need me to delete a particular key that iceberg then
   matches at scan time to skip those records.
      1. The problem here is that when a record with DeleleKey 1 is
      inserted and then deleted and inserted again with key 1 iceberg shows no
      records. That is the intended way it has to work I guess. But
that means I
      need to be more careful when writing to iceberg.
   2. PositionalDeletes are amazing because I can give an offset and the
   file name of where I want the record to be deleted and I can handle updates
   here by delete and insert and the above case is handled.
      1. What I am stuck here with is, after records have been inserted
      into a file and if I see a delete request or update how do I find the
      offset of the record that needs to be deleted in the inserts file?
      2. Do I need to do a table scan every time I get a delete request?
      that means I will do a lot of IO and CDC implementation will be
crazy slow.

Please can you suggest what is the correct way of applying CDC log files
correctly with a JVM task.

Regards,
Taher Koitawala





On Thu, Aug 25, 2022 at 9:39 AM Taher Koitawala <ta...@gmail.com> wrote:

> Thank you for your response Ryan. We will evaluate your suggestions to
> sticking with a query engine and also I will try to code you share with me.
>
>
>
> On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <bl...@tabular.io> wrote:
>
>> Hi Taher,
>>
>> It looks like you’re writing something in Java to work with the data
>> directly. That’s well supported, but you may want to consider using a
>> compute engine to make this process a bit easier. Most of the issues that
>> you’re hitting would probably be solved automatically because those engines
>> will translate correctly to and from Iceberg and other formats.
>>
>> Assuming that you want to move forward with Java, I think the issue
>> you’re hitting is that you’re not using the same in-memory object model to
>> read and write and are then trying to translate values by hand. Instead, I
>> recommend using Iceberg readers for both reading and writing so that you
>> get consistent in-memory records.
>>
>> To do that, you should use the Parquet class to instantiate both the
>> reader and writer. You’ll need to use an Iceberg schema from the table
>> you’re writing into (which has assigned the field IDs). For the reader,
>> you’ll also need to pass a name mapping (withNameMapping) that you can
>> generate from the schema with MappingUtil.create(icebergSchema). That
>> name mapping enables you to read Parquet files without field IDs.
>>
>> Once you have it set up, it should look something like this:
>>
>> schema  = ParquetSchemaUtil.convert(parquetSchema)
>> table = catalog.createTable(identifier, schema)
>> nameMapping = MappingUtil.create(table.schema())
>>
>> try (CloseableIterable<Record> reader = Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build()) {
>>   try (FileAppender<Record> writer = Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build()) {
>>     for (Record record : reader) {
>>       writer.add(record);
>>     }
>>   }
>> }
>>
>> Ryan
>>
>> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <ta...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>          Please can someone guide me regarding the above email?
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <ta...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>         I am creating an iceberg writer over temporal service that
>>>> converts CDC parquet files to Iceberg format. That means that the file will
>>>> have a record and corresponding timestamp flags like `inserted_at`,
>>>> `deleted_at` and `updated_at`, each of which will have a value defining the
>>>> action.
>>>>
>>>> Initially, when there is no table in the iceberg catalog, the plan is
>>>> to use the Parquet footer schema and map that directly to the Iceberg
>>>> schema using *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType
>>>> parquetSchema).* However, the issue that I am facing is that I am also
>>>> having to convert Parquet datatypes to Iceberg datatypes, specifically the
>>>> timestamp types when inserting into the table.
>>>>
>>>> When using the Parquet reader with the simple group, I see the
>>>> timestamp as long and when inserted to iceberg, it expects it to be
>>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be
>>>> cast to OffsetDateTime`
>>>>
>>>> I have 2 questions on this use case:
>>>> 1. Is there an easy way to insert parquet to iceberg records directly
>>>> without me having to do a type conversion since the goal is to make it all
>>>> happen within temporal?
>>>> 2. Need suggestions to handle updates. As for updates I'm having to
>>>> commit inserts and then commit deletes and then create a new writer again
>>>> to proceed.
>>>>
>>>> Regards,
>>>> Taher Koitawala
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Tabular
>>
>

Re: Temporal Iceberg Service

Posted by Ryan Blue <bl...@tabular.io>.
Taher,

Positional deletes require that you know the file and position in that file
of the record you want to delete. So you're right that if you want to use a
positional delete, you need to either keep an index of where records are
(which is what we do when upserting records) or scan to find the records to
remove (which is what we do in MERGE INTO commands).

Usually, you want to use an engine that has existing support for these
since there are a lot of trade-offs to how you use the primitives from the
format itself. Most of the time, I'd recommend taking each new batch of
data and using MERGE INTO to add it to your table. For high volume CDC, I'd
recommend landing the changes directly as a fact table --- that requires no
special handling --- and compacting it into a final table periodically.
That's probably going to be the most efficient way to handle the stream.

Ryan

On Thu, Sep 1, 2022 at 9:13 AM Taher Koitawala <ta...@gmail.com> wrote:

> My question is not about planning a scan. My question is around the CDC
> log implementation, so if a writer is open and I get a insert and delete
> for a record. If i do EQ delete that record is gone.
>
> However if i do insert, delete and insert the exact same record while the
> writer is currently open I will not see any records for the key because EQ
> delete kicked in.
>
> The above is resolved via a Positional delete. Then my question is let's
> say if i get a Positional delete for a record that is already flushed to
> some file. How do I tell the positional delete writer that go to this file
> and offset and that needs to be put in the delete file. Do I have to do a
> table scan for that? That or is there an efficient way around this?
>
> On Thu, 1 Sep, 2022, 7:00 pm Zoltán Borók-Nagy,
> <bo...@cloudera.com.invalid> wrote:
>
>> Hi Taher,
>>
>> I think most of your questions are answered in the Scan Planning section
>> at the Iceberg spec page: https://iceberg.apache.org/spec/#scan-planning
>>
>> To give you some specific answers as well:
>> Equality Deletes: data and delete files have sequence numbers from which
>> readers can infer the relative age of the data. Delete files are only
>> applied to older data files. This means if you insert data again with a key
>> that was deleted earlier then Iceberg should show the newly inserted record.
>>
>> Position Deletes: When reading data files, the reader must keep track of
>> the file position and only return rows that do not have a record in the
>> delete files. Alternatively you can do a big ANTI JOIN between data files
>> and delete files. This latter was our approach in Impala:
>> https://docs.google.com/document/d/1WF_UOanQ61RUuQlM4LaiRWI0YXpPKZ2VEJ8gyJdDyoY/edit#heading=h.5bmfhbmb4qdk
>>
>> Cheers,
>>     Zoltan
>>
>>
>>
>> On Thu, Sep 1, 2022 at 7:34 AM Taher Koitawala <ta...@gmail.com>
>> wrote:
>>
>>> Thank you, Ryan and the iceberg community the suggestions really helped
>>> progress a lot of development. On the same usecase, I hit another
>>> block about doing CDC updates and deletes.
>>>
>>> I see two options for managing deletes, for now, EqualityDeletes and
>>> PositionalDeletes:
>>>
>>>    1. EqaulityDeletes need me to delete a particular key that iceberg
>>>    then matches at scan time to skip those records.
>>>       1. The problem here is that when a record with DeleleKey 1 is
>>>       inserted and then deleted and inserted again with key 1 iceberg shows no
>>>       records. That is the intended way it has to work I guess. But that means I
>>>       need to be more careful when writing to iceberg.
>>>    2. PositionalDeletes are amazing because I can give an offset and
>>>    the file name of where I want the record to be deleted and I can handle
>>>    updates here by delete and insert and the above case is handled.
>>>       1. What I am stuck here with is, after records have been inserted
>>>       into a file and if I see a delete request or update how do I find the
>>>       offset of the record that needs to be deleted in the inserts file?
>>>       2. Do I need to do a table scan every time I get a delete
>>>       request? that means I will do a lot of IO and CDC implementation will be
>>>       crazy slow.
>>>
>>> Please can you suggest what is the correct way of applying CDC log files
>>> correctly with a JVM task.
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Aug 25, 2022 at 9:39 AM Taher Koitawala <ta...@gmail.com>
>>> wrote:
>>>
>>>> Thank you for your response Ryan. We will evaluate your suggestions to
>>>> sticking with a query engine and also I will try to code you share with me.
>>>>
>>>>
>>>>
>>>> On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <bl...@tabular.io> wrote:
>>>>
>>>>> Hi Taher,
>>>>>
>>>>> It looks like you’re writing something in Java to work with the data
>>>>> directly. That’s well supported, but you may want to consider using a
>>>>> compute engine to make this process a bit easier. Most of the issues that
>>>>> you’re hitting would probably be solved automatically because those engines
>>>>> will translate correctly to and from Iceberg and other formats.
>>>>>
>>>>> Assuming that you want to move forward with Java, I think the issue
>>>>> you’re hitting is that you’re not using the same in-memory object model to
>>>>> read and write and are then trying to translate values by hand. Instead, I
>>>>> recommend using Iceberg readers for both reading and writing so that you
>>>>> get consistent in-memory records.
>>>>>
>>>>> To do that, you should use the Parquet class to instantiate both the
>>>>> reader and writer. You’ll need to use an Iceberg schema from the table
>>>>> you’re writing into (which has assigned the field IDs). For the reader,
>>>>> you’ll also need to pass a name mapping (withNameMapping) that you
>>>>> can generate from the schema with MappingUtil.create(icebergSchema).
>>>>> That name mapping enables you to read Parquet files without field IDs.
>>>>>
>>>>> Once you have it set up, it should look something like this:
>>>>>
>>>>> schema  = ParquetSchemaUtil.convert(parquetSchema)
>>>>> table = catalog.createTable(identifier, schema)
>>>>> nameMapping = MappingUtil.create(table.schema())
>>>>>
>>>>> try (CloseableIterable<Record> reader = Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build()) {
>>>>>   try (FileAppender<Record> writer = Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build()) {
>>>>>     for (Record record : reader) {
>>>>>       writer.add(record);
>>>>>     }
>>>>>   }
>>>>> }
>>>>>
>>>>> Ryan
>>>>>
>>>>> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <ta...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>          Please can someone guide me regarding the above email?
>>>>>>
>>>>>> Regards,
>>>>>> Taher Koitawala
>>>>>>
>>>>>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <ta...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>         I am creating an iceberg writer over temporal service that
>>>>>>> converts CDC parquet files to Iceberg format. That means that the file will
>>>>>>> have a record and corresponding timestamp flags like `inserted_at`,
>>>>>>> `deleted_at` and `updated_at`, each of which will have a value defining the
>>>>>>> action.
>>>>>>>
>>>>>>> Initially, when there is no table in the iceberg catalog, the plan
>>>>>>> is to use the Parquet footer schema and map that directly to the Iceberg
>>>>>>> schema using *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType
>>>>>>> parquetSchema).* However, the issue that I am facing is that I am
>>>>>>> also having to convert Parquet datatypes to Iceberg datatypes,
>>>>>>> specifically the timestamp types when inserting into the table.
>>>>>>>
>>>>>>> When using the Parquet reader with the simple group, I see the
>>>>>>> timestamp as long and when inserted to iceberg, it expects it to be
>>>>>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be
>>>>>>> cast to OffsetDateTime`
>>>>>>>
>>>>>>> I have 2 questions on this use case:
>>>>>>> 1. Is there an easy way to insert parquet to iceberg records
>>>>>>> directly without me having to do a type conversion since the goal is to
>>>>>>> make it all happen within temporal?
>>>>>>> 2. Need suggestions to handle updates. As for updates I'm having to
>>>>>>> commit inserts and then commit deletes and then create a new writer again
>>>>>>> to proceed.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Taher Koitawala
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Tabular
>>>>>
>>>>

-- 
Ryan Blue
Tabular

Re: Temporal Iceberg Service

Posted by Taher Koitawala <ta...@gmail.com>.
My question is not about planning a scan. My question is around the CDC log
implementation, so if a writer is open and I get a insert and delete for a
record. If i do EQ delete that record is gone.

However if i do insert, delete and insert the exact same record while the
writer is currently open I will not see any records for the key because EQ
delete kicked in.

The above is resolved via a Positional delete. Then my question is let's
say if i get a Positional delete for a record that is already flushed to
some file. How do I tell the positional delete writer that go to this file
and offset and that needs to be put in the delete file. Do I have to do a
table scan for that? That or is there an efficient way around this?

On Thu, 1 Sep, 2022, 7:00 pm Zoltán Borók-Nagy,
<bo...@cloudera.com.invalid> wrote:

> Hi Taher,
>
> I think most of your questions are answered in the Scan Planning section
> at the Iceberg spec page: https://iceberg.apache.org/spec/#scan-planning
>
> To give you some specific answers as well:
> Equality Deletes: data and delete files have sequence numbers from which
> readers can infer the relative age of the data. Delete files are only
> applied to older data files. This means if you insert data again with a key
> that was deleted earlier then Iceberg should show the newly inserted record.
>
> Position Deletes: When reading data files, the reader must keep track of
> the file position and only return rows that do not have a record in the
> delete files. Alternatively you can do a big ANTI JOIN between data files
> and delete files. This latter was our approach in Impala:
> https://docs.google.com/document/d/1WF_UOanQ61RUuQlM4LaiRWI0YXpPKZ2VEJ8gyJdDyoY/edit#heading=h.5bmfhbmb4qdk
>
> Cheers,
>     Zoltan
>
>
>
> On Thu, Sep 1, 2022 at 7:34 AM Taher Koitawala <ta...@gmail.com> wrote:
>
>> Thank you, Ryan and the iceberg community the suggestions really helped
>> progress a lot of development. On the same usecase, I hit another
>> block about doing CDC updates and deletes.
>>
>> I see two options for managing deletes, for now, EqualityDeletes and
>> PositionalDeletes:
>>
>>    1. EqaulityDeletes need me to delete a particular key that iceberg
>>    then matches at scan time to skip those records.
>>       1. The problem here is that when a record with DeleleKey 1 is
>>       inserted and then deleted and inserted again with key 1 iceberg shows no
>>       records. That is the intended way it has to work I guess. But that means I
>>       need to be more careful when writing to iceberg.
>>    2. PositionalDeletes are amazing because I can give an offset and the
>>    file name of where I want the record to be deleted and I can handle updates
>>    here by delete and insert and the above case is handled.
>>       1. What I am stuck here with is, after records have been inserted
>>       into a file and if I see a delete request or update how do I find the
>>       offset of the record that needs to be deleted in the inserts file?
>>       2. Do I need to do a table scan every time I get a delete request?
>>       that means I will do a lot of IO and CDC implementation will be crazy slow.
>>
>> Please can you suggest what is the correct way of applying CDC log files
>> correctly with a JVM task.
>>
>> Regards,
>> Taher Koitawala
>>
>>
>>
>>
>>
>> On Thu, Aug 25, 2022 at 9:39 AM Taher Koitawala <ta...@gmail.com>
>> wrote:
>>
>>> Thank you for your response Ryan. We will evaluate your suggestions to
>>> sticking with a query engine and also I will try to code you share with me.
>>>
>>>
>>>
>>> On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <bl...@tabular.io> wrote:
>>>
>>>> Hi Taher,
>>>>
>>>> It looks like you’re writing something in Java to work with the data
>>>> directly. That’s well supported, but you may want to consider using a
>>>> compute engine to make this process a bit easier. Most of the issues that
>>>> you’re hitting would probably be solved automatically because those engines
>>>> will translate correctly to and from Iceberg and other formats.
>>>>
>>>> Assuming that you want to move forward with Java, I think the issue
>>>> you’re hitting is that you’re not using the same in-memory object model to
>>>> read and write and are then trying to translate values by hand. Instead, I
>>>> recommend using Iceberg readers for both reading and writing so that you
>>>> get consistent in-memory records.
>>>>
>>>> To do that, you should use the Parquet class to instantiate both the
>>>> reader and writer. You’ll need to use an Iceberg schema from the table
>>>> you’re writing into (which has assigned the field IDs). For the reader,
>>>> you’ll also need to pass a name mapping (withNameMapping) that you can
>>>> generate from the schema with MappingUtil.create(icebergSchema). That
>>>> name mapping enables you to read Parquet files without field IDs.
>>>>
>>>> Once you have it set up, it should look something like this:
>>>>
>>>> schema  = ParquetSchemaUtil.convert(parquetSchema)
>>>> table = catalog.createTable(identifier, schema)
>>>> nameMapping = MappingUtil.create(table.schema())
>>>>
>>>> try (CloseableIterable<Record> reader = Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build()) {
>>>>   try (FileAppender<Record> writer = Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build()) {
>>>>     for (Record record : reader) {
>>>>       writer.add(record);
>>>>     }
>>>>   }
>>>> }
>>>>
>>>> Ryan
>>>>
>>>> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <ta...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>          Please can someone guide me regarding the above email?
>>>>>
>>>>> Regards,
>>>>> Taher Koitawala
>>>>>
>>>>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <ta...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>         I am creating an iceberg writer over temporal service that
>>>>>> converts CDC parquet files to Iceberg format. That means that the file will
>>>>>> have a record and corresponding timestamp flags like `inserted_at`,
>>>>>> `deleted_at` and `updated_at`, each of which will have a value defining the
>>>>>> action.
>>>>>>
>>>>>> Initially, when there is no table in the iceberg catalog, the plan is
>>>>>> to use the Parquet footer schema and map that directly to the Iceberg
>>>>>> schema using *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType
>>>>>> parquetSchema).* However, the issue that I am facing is that I am
>>>>>> also having to convert Parquet datatypes to Iceberg datatypes,
>>>>>> specifically the timestamp types when inserting into the table.
>>>>>>
>>>>>> When using the Parquet reader with the simple group, I see the
>>>>>> timestamp as long and when inserted to iceberg, it expects it to be
>>>>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be
>>>>>> cast to OffsetDateTime`
>>>>>>
>>>>>> I have 2 questions on this use case:
>>>>>> 1. Is there an easy way to insert parquet to iceberg records directly
>>>>>> without me having to do a type conversion since the goal is to make it all
>>>>>> happen within temporal?
>>>>>> 2. Need suggestions to handle updates. As for updates I'm having to
>>>>>> commit inserts and then commit deletes and then create a new writer again
>>>>>> to proceed.
>>>>>>
>>>>>> Regards,
>>>>>> Taher Koitawala
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Tabular
>>>>
>>>

Re: Temporal Iceberg Service

Posted by Zoltán Borók-Nagy <bo...@cloudera.com.INVALID>.
Hi Taher,

I think most of your questions are answered in the Scan Planning section at
the Iceberg spec page: https://iceberg.apache.org/spec/#scan-planning

To give you some specific answers as well:
Equality Deletes: data and delete files have sequence numbers from which
readers can infer the relative age of the data. Delete files are only
applied to older data files. This means if you insert data again with a key
that was deleted earlier then Iceberg should show the newly inserted record.

Position Deletes: When reading data files, the reader must keep track of
the file position and only return rows that do not have a record in the
delete files. Alternatively you can do a big ANTI JOIN between data files
and delete files. This latter was our approach in Impala:
https://docs.google.com/document/d/1WF_UOanQ61RUuQlM4LaiRWI0YXpPKZ2VEJ8gyJdDyoY/edit#heading=h.5bmfhbmb4qdk

Cheers,
    Zoltan



On Thu, Sep 1, 2022 at 7:34 AM Taher Koitawala <ta...@gmail.com> wrote:

> Thank you, Ryan and the iceberg community the suggestions really helped
> progress a lot of development. On the same usecase, I hit another
> block about doing CDC updates and deletes.
>
> I see two options for managing deletes, for now, EqualityDeletes and
> PositionalDeletes:
>
>    1. EqaulityDeletes need me to delete a particular key that iceberg
>    then matches at scan time to skip those records.
>       1. The problem here is that when a record with DeleleKey 1 is
>       inserted and then deleted and inserted again with key 1 iceberg shows no
>       records. That is the intended way it has to work I guess. But that means I
>       need to be more careful when writing to iceberg.
>    2. PositionalDeletes are amazing because I can give an offset and the
>    file name of where I want the record to be deleted and I can handle updates
>    here by delete and insert and the above case is handled.
>       1. What I am stuck here with is, after records have been inserted
>       into a file and if I see a delete request or update how do I find the
>       offset of the record that needs to be deleted in the inserts file?
>       2. Do I need to do a table scan every time I get a delete request?
>       that means I will do a lot of IO and CDC implementation will be crazy slow.
>
> Please can you suggest what is the correct way of applying CDC log files
> correctly with a JVM task.
>
> Regards,
> Taher Koitawala
>
>
>
>
>
> On Thu, Aug 25, 2022 at 9:39 AM Taher Koitawala <ta...@gmail.com>
> wrote:
>
>> Thank you for your response Ryan. We will evaluate your suggestions to
>> sticking with a query engine and also I will try to code you share with me.
>>
>>
>>
>> On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <bl...@tabular.io> wrote:
>>
>>> Hi Taher,
>>>
>>> It looks like you’re writing something in Java to work with the data
>>> directly. That’s well supported, but you may want to consider using a
>>> compute engine to make this process a bit easier. Most of the issues that
>>> you’re hitting would probably be solved automatically because those engines
>>> will translate correctly to and from Iceberg and other formats.
>>>
>>> Assuming that you want to move forward with Java, I think the issue
>>> you’re hitting is that you’re not using the same in-memory object model to
>>> read and write and are then trying to translate values by hand. Instead, I
>>> recommend using Iceberg readers for both reading and writing so that you
>>> get consistent in-memory records.
>>>
>>> To do that, you should use the Parquet class to instantiate both the
>>> reader and writer. You’ll need to use an Iceberg schema from the table
>>> you’re writing into (which has assigned the field IDs). For the reader,
>>> you’ll also need to pass a name mapping (withNameMapping) that you can
>>> generate from the schema with MappingUtil.create(icebergSchema). That
>>> name mapping enables you to read Parquet files without field IDs.
>>>
>>> Once you have it set up, it should look something like this:
>>>
>>> schema  = ParquetSchemaUtil.convert(parquetSchema)
>>> table = catalog.createTable(identifier, schema)
>>> nameMapping = MappingUtil.create(table.schema())
>>>
>>> try (CloseableIterable<Record> reader = Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build()) {
>>>   try (FileAppender<Record> writer = Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build()) {
>>>     for (Record record : reader) {
>>>       writer.add(record);
>>>     }
>>>   }
>>> }
>>>
>>> Ryan
>>>
>>> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <ta...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>          Please can someone guide me regarding the above email?
>>>>
>>>> Regards,
>>>> Taher Koitawala
>>>>
>>>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <ta...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>         I am creating an iceberg writer over temporal service that
>>>>> converts CDC parquet files to Iceberg format. That means that the file will
>>>>> have a record and corresponding timestamp flags like `inserted_at`,
>>>>> `deleted_at` and `updated_at`, each of which will have a value defining the
>>>>> action.
>>>>>
>>>>> Initially, when there is no table in the iceberg catalog, the plan is
>>>>> to use the Parquet footer schema and map that directly to the Iceberg
>>>>> schema using *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType
>>>>> parquetSchema).* However, the issue that I am facing is that I am
>>>>> also having to convert Parquet datatypes to Iceberg datatypes,
>>>>> specifically the timestamp types when inserting into the table.
>>>>>
>>>>> When using the Parquet reader with the simple group, I see the
>>>>> timestamp as long and when inserted to iceberg, it expects it to be
>>>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be
>>>>> cast to OffsetDateTime`
>>>>>
>>>>> I have 2 questions on this use case:
>>>>> 1. Is there an easy way to insert parquet to iceberg records directly
>>>>> without me having to do a type conversion since the goal is to make it all
>>>>> happen within temporal?
>>>>> 2. Need suggestions to handle updates. As for updates I'm having to
>>>>> commit inserts and then commit deletes and then create a new writer again
>>>>> to proceed.
>>>>>
>>>>> Regards,
>>>>> Taher Koitawala
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Tabular
>>>
>>