You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Chen Song <ch...@gmail.com> on 2021/04/06 19:28:23 UTC
Re: how to test row level delete
Hey I want to quickly follow up on this thread.
I cannot seem to find any pull request to expose V2 format version on table
creation, specifically for the line below referenced in your previous email.
TableProperties.FORMAT_VERSION
Can you suggest? I want to create a V2 table to test some row level
upserts/deletes.
Chen
On Sun, Dec 27, 2020 at 9:33 PM OpenInx <op...@gmail.com> wrote:
> > you can apply this patch in your own repository
>
> The patch is : https://github.com/apache/iceberg/pull/1978
>
> On Mon, Dec 28, 2020 at 10:32 AM OpenInx <op...@gmail.com> wrote:
>
>> Hi liubo07199
>>
>> Thanks for testing the iceberg row-level delete, I skimmed the code, it
>> seems you were trying the equality-delete feature. For iceberg users, I
>> think we don't have to write those iceberg internal codes to get this work,
>> this isn't friendly for users. Instead, we usually use the
>> equality-delete ( CDC events ingestion or flink aggregation upsert
>> streams) feature based on the compute-engine work. Currently, we've
>> supported the flink cdc-events integration (Flink Datastream integration
>> has been merged [1] while the Flink SQL integration depends on the time
>> when we are ready to expose iceberg format v2 [2])
>>
>> About what's the time to expose format v2 to users, you may want to read
>> this mail [3].
>>
>> If you just want to have a basic test for writing cdc by flink, you can
>> apply this patch in your own repository, and then create an iceberg table
>> with an extra option like the following:
>>
>> public static Table createTable(String path, Map<String, String> properties, boolean partitioned) {
>> PartitionSpec spec;
>> if (partitioned) {
>> spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
>> } else {
>> spec = PartitionSpec.unpartitioned();
>> }
>> properties.put(TableProperties.FORMAT_VERSION, "2");
>> return new HadoopTables().create(SCHEMA, spec, properties, path);
>> }
>>
>> Then use the flink data stream api or flink sql to write the cdc events
>> into an apache iceberg table. For data stream job to sinking cdc events I
>> suggest to use the similar way here [4].
>>
>> I'd like to help if you have further feedback.
>>
>> Thanks.
>>
>> [1]. https://github.com/apache/iceberg/pull/1974
>> [2]. https://github.com/apache/iceberg/pull/1978
>> [3].
>> https://mail-archives.apache.org/mod_mbox/iceberg-dev/202012.mbox/%3CCACc8XkGt%2B5kxr-XRMgY1eUKjd70mej38KFbhDuV2MH3AVMON2g%40mail.gmail.com%3E
>> [4].
>> https://github.com/apache/iceberg/pull/1974/files#diff-13e2e5b52d0effe51e1b470df77cb08b5ec8cc4f3a7f0fd4e51ee212fc83f76aR143
>>
>> On Sat, Dec 26, 2020 at 7:14 PM 1 <li...@126.com> wrote:
>>
>>> Hi, all:
>>>
>>> I want to try row level delete, but get the exception : IllegalArgumentException:
>>> Cannot write delete files in a v1 table.
>>> I look over https://iceberg.apache.org/spec/#table-metadata for
>>> format-version, it said that An integer version number for the format.
>>> Currently, this is always 1. Implementations must throw an exception if a
>>> table’s version is higher than the supported version. so what can i do to
>>> test row-level deletion ?
>>> So what can I do to have a try to row level delete? how to create a
>>> v2 table ?
>>>
>>> thx
>>>
>>> Code is :
>>>
>>> private static void deleteRead() throws IOException {
>>> Schema deleteRowSchema = table.schema().select("id");
>>> Record dataDelete = GenericRecord.create(deleteRowSchema);
>>> List<Record> dataDeletes = Lists.newArrayList(
>>> dataDelete.copy("id", 11), // id = 29
>>> dataDelete.copy("id", 12), // id = 89
>>> dataDelete.copy("id", 13) // id = 122
>>> );
>>>
>>> DeleteFile eqDeletes = writeDeleteFile(table, Files.localOutput(tmpFile), dataDeletes, deleteRowSchema);
>>>
>>> table.newRowDelta()
>>> .addDeletes(eqDeletes)
>>> .commit();
>>> }
>>>
>>> private static DeleteFile writeDeleteFile(Table table, OutputFile out,
>>> List<Record> deletes, Schema deleteRowSchema) throws IOException {
>>> EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
>>> .forTable(table)
>>> .withPartition(Row.of("20201221"))
>>> .rowSchema(deleteRowSchema)
>>> .createWriterFunc(GenericParquetWriter::buildWriter)
>>> .overwrite()
>>> .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
>>> .buildEqualityWriter();
>>>
>>> try (Closeable toClose = writer) {
>>> writer.deleteAll(deletes);
>>> }
>>>
>>> return writer.toDeleteFile();
>>> }
>>>
>>> liubo07199
>>> liubo07199@hellobike.com
>>>
>>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=liubo07199&uid=liubo07199%40hellobike.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22liubo07199%40hellobike.com%22%5D>
>>>
>>
--
Chen Song
Re: how to test row level delete
Posted by Chen Song <ch...@gmail.com>.
Thanks for the clarification.
On Tue, Apr 6, 2021 at 10:25 PM OpenInx <op...@gmail.com> wrote:
> Hi Chen Song
>
> If want to test the format v2 under your env, you could follow this
> comment https://github.com/apache/iceberg/pull/2410#issuecomment-812463051
> to upgrade your iceberg table to format v2.
>
> The TableProperties.FORMAT_VERSION was introduced in a separate PoC PR ,
> so we could not find this static variable in the current apache iceberg
> master branch.
>
> On Wed, Apr 7, 2021 at 3:28 AM Chen Song <ch...@gmail.com> wrote:
>
>> Hey I want to quickly follow up on this thread.
>>
>> I cannot seem to find any pull request to expose V2 format version on
>> table creation, specifically for the line below referenced in your
>> previous email.
>>
>> TableProperties.FORMAT_VERSION
>>
>> Can you suggest? I want to create a V2 table to test some row level
>> upserts/deletes.
>>
>> Chen
>>
>> On Sun, Dec 27, 2020 at 9:33 PM OpenInx <op...@gmail.com> wrote:
>>
>>> > you can apply this patch in your own repository
>>>
>>> The patch is : https://github.com/apache/iceberg/pull/1978
>>>
>>> On Mon, Dec 28, 2020 at 10:32 AM OpenInx <op...@gmail.com> wrote:
>>>
>>>> Hi liubo07199
>>>>
>>>> Thanks for testing the iceberg row-level delete, I skimmed the code,
>>>> it seems you were trying the equality-delete feature. For iceberg users, I
>>>> think we don't have to write those iceberg internal codes to get this work,
>>>> this isn't friendly for users. Instead, we usually use the
>>>> equality-delete ( CDC events ingestion or flink aggregation upsert
>>>> streams) feature based on the compute-engine work. Currently, we've
>>>> supported the flink cdc-events integration (Flink Datastream integration
>>>> has been merged [1] while the Flink SQL integration depends on the time
>>>> when we are ready to expose iceberg format v2 [2])
>>>>
>>>> About what's the time to expose format v2 to users, you may want to
>>>> read this mail [3].
>>>>
>>>> If you just want to have a basic test for writing cdc by flink, you
>>>> can apply this patch in your own repository, and then create an iceberg
>>>> table with an extra option like the following:
>>>>
>>>> public static Table createTable(String path, Map<String, String> properties, boolean partitioned) {
>>>> PartitionSpec spec;
>>>> if (partitioned) {
>>>> spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
>>>> } else {
>>>> spec = PartitionSpec.unpartitioned();
>>>> }
>>>> properties.put(TableProperties.FORMAT_VERSION, "2");
>>>> return new HadoopTables().create(SCHEMA, spec, properties, path);
>>>> }
>>>>
>>>> Then use the flink data stream api or flink sql to write the cdc events
>>>> into an apache iceberg table. For data stream job to sinking cdc events I
>>>> suggest to use the similar way here [4].
>>>>
>>>> I'd like to help if you have further feedback.
>>>>
>>>> Thanks.
>>>>
>>>> [1]. https://github.com/apache/iceberg/pull/1974
>>>> [2]. https://github.com/apache/iceberg/pull/1978
>>>> [3].
>>>> https://mail-archives.apache.org/mod_mbox/iceberg-dev/202012.mbox/%3CCACc8XkGt%2B5kxr-XRMgY1eUKjd70mej38KFbhDuV2MH3AVMON2g%40mail.gmail.com%3E
>>>> [4].
>>>> https://github.com/apache/iceberg/pull/1974/files#diff-13e2e5b52d0effe51e1b470df77cb08b5ec8cc4f3a7f0fd4e51ee212fc83f76aR143
>>>>
>>>> On Sat, Dec 26, 2020 at 7:14 PM 1 <li...@126.com> wrote:
>>>>
>>>>> Hi, all:
>>>>>
>>>>> I want to try row level delete, but get the exception : IllegalArgumentException:
>>>>> Cannot write delete files in a v1 table.
>>>>> I look over https://iceberg.apache.org/spec/#table-metadata for
>>>>> format-version, it said that An integer version number for the format.
>>>>> Currently, this is always 1. Implementations must throw an exception if a
>>>>> table’s version is higher than the supported version. so what can i do to
>>>>> test row-level deletion ?
>>>>> So what can I do to have a try to row level delete? how to create
>>>>> a v2 table ?
>>>>>
>>>>> thx
>>>>>
>>>>> Code is :
>>>>>
>>>>> private static void deleteRead() throws IOException {
>>>>> Schema deleteRowSchema = table.schema().select("id");
>>>>> Record dataDelete = GenericRecord.create(deleteRowSchema);
>>>>> List<Record> dataDeletes = Lists.newArrayList(
>>>>> dataDelete.copy("id", 11), // id = 29
>>>>> dataDelete.copy("id", 12), // id = 89
>>>>> dataDelete.copy("id", 13) // id = 122
>>>>> );
>>>>>
>>>>> DeleteFile eqDeletes = writeDeleteFile(table, Files.localOutput(tmpFile), dataDeletes, deleteRowSchema);
>>>>>
>>>>> table.newRowDelta()
>>>>> .addDeletes(eqDeletes)
>>>>> .commit();
>>>>> }
>>>>>
>>>>> private static DeleteFile writeDeleteFile(Table table, OutputFile out,
>>>>> List<Record> deletes, Schema deleteRowSchema) throws IOException {
>>>>> EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
>>>>> .forTable(table)
>>>>> .withPartition(Row.of("20201221"))
>>>>> .rowSchema(deleteRowSchema)
>>>>> .createWriterFunc(GenericParquetWriter::buildWriter)
>>>>> .overwrite()
>>>>> .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
>>>>> .buildEqualityWriter();
>>>>>
>>>>> try (Closeable toClose = writer) {
>>>>> writer.deleteAll(deletes);
>>>>> }
>>>>>
>>>>> return writer.toDeleteFile();
>>>>> }
>>>>>
>>>>> liubo07199
>>>>> liubo07199@hellobike.com
>>>>>
>>>>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=liubo07199&uid=liubo07199%40hellobike.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22liubo07199%40hellobike.com%22%5D>
>>>>>
>>>>
>>
>> --
>> Chen Song
>>
>>
--
Chen Song
Re: how to test row level delete
Posted by OpenInx <op...@gmail.com>.
Hi Chen Song
If want to test the format v2 under your env, you could follow this
comment https://github.com/apache/iceberg/pull/2410#issuecomment-812463051
to upgrade your iceberg table to format v2.
The TableProperties.FORMAT_VERSION was introduced in a separate PoC PR , so
we could not find this static variable in the current apache iceberg master
branch.
On Wed, Apr 7, 2021 at 3:28 AM Chen Song <ch...@gmail.com> wrote:
> Hey I want to quickly follow up on this thread.
>
> I cannot seem to find any pull request to expose V2 format version on
> table creation, specifically for the line below referenced in your
> previous email.
>
> TableProperties.FORMAT_VERSION
>
> Can you suggest? I want to create a V2 table to test some row level
> upserts/deletes.
>
> Chen
>
> On Sun, Dec 27, 2020 at 9:33 PM OpenInx <op...@gmail.com> wrote:
>
>> > you can apply this patch in your own repository
>>
>> The patch is : https://github.com/apache/iceberg/pull/1978
>>
>> On Mon, Dec 28, 2020 at 10:32 AM OpenInx <op...@gmail.com> wrote:
>>
>>> Hi liubo07199
>>>
>>> Thanks for testing the iceberg row-level delete, I skimmed the code, it
>>> seems you were trying the equality-delete feature. For iceberg users, I
>>> think we don't have to write those iceberg internal codes to get this work,
>>> this isn't friendly for users. Instead, we usually use the
>>> equality-delete ( CDC events ingestion or flink aggregation upsert
>>> streams) feature based on the compute-engine work. Currently, we've
>>> supported the flink cdc-events integration (Flink Datastream integration
>>> has been merged [1] while the Flink SQL integration depends on the time
>>> when we are ready to expose iceberg format v2 [2])
>>>
>>> About what's the time to expose format v2 to users, you may want to read
>>> this mail [3].
>>>
>>> If you just want to have a basic test for writing cdc by flink, you can
>>> apply this patch in your own repository, and then create an iceberg table
>>> with an extra option like the following:
>>>
>>> public static Table createTable(String path, Map<String, String> properties, boolean partitioned) {
>>> PartitionSpec spec;
>>> if (partitioned) {
>>> spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
>>> } else {
>>> spec = PartitionSpec.unpartitioned();
>>> }
>>> properties.put(TableProperties.FORMAT_VERSION, "2");
>>> return new HadoopTables().create(SCHEMA, spec, properties, path);
>>> }
>>>
>>> Then use the flink data stream api or flink sql to write the cdc events
>>> into an apache iceberg table. For data stream job to sinking cdc events I
>>> suggest to use the similar way here [4].
>>>
>>> I'd like to help if you have further feedback.
>>>
>>> Thanks.
>>>
>>> [1]. https://github.com/apache/iceberg/pull/1974
>>> [2]. https://github.com/apache/iceberg/pull/1978
>>> [3].
>>> https://mail-archives.apache.org/mod_mbox/iceberg-dev/202012.mbox/%3CCACc8XkGt%2B5kxr-XRMgY1eUKjd70mej38KFbhDuV2MH3AVMON2g%40mail.gmail.com%3E
>>> [4].
>>> https://github.com/apache/iceberg/pull/1974/files#diff-13e2e5b52d0effe51e1b470df77cb08b5ec8cc4f3a7f0fd4e51ee212fc83f76aR143
>>>
>>> On Sat, Dec 26, 2020 at 7:14 PM 1 <li...@126.com> wrote:
>>>
>>>> Hi, all:
>>>>
>>>> I want to try row level delete, but get the exception : IllegalArgumentException:
>>>> Cannot write delete files in a v1 table.
>>>> I look over https://iceberg.apache.org/spec/#table-metadata for
>>>> format-version, it said that An integer version number for the format.
>>>> Currently, this is always 1. Implementations must throw an exception if a
>>>> table’s version is higher than the supported version. so what can i do to
>>>> test row-level deletion ?
>>>> So what can I do to have a try to row level delete? how to create a
>>>> v2 table ?
>>>>
>>>> thx
>>>>
>>>> Code is :
>>>>
>>>> private static void deleteRead() throws IOException {
>>>> Schema deleteRowSchema = table.schema().select("id");
>>>> Record dataDelete = GenericRecord.create(deleteRowSchema);
>>>> List<Record> dataDeletes = Lists.newArrayList(
>>>> dataDelete.copy("id", 11), // id = 29
>>>> dataDelete.copy("id", 12), // id = 89
>>>> dataDelete.copy("id", 13) // id = 122
>>>> );
>>>>
>>>> DeleteFile eqDeletes = writeDeleteFile(table, Files.localOutput(tmpFile), dataDeletes, deleteRowSchema);
>>>>
>>>> table.newRowDelta()
>>>> .addDeletes(eqDeletes)
>>>> .commit();
>>>> }
>>>>
>>>> private static DeleteFile writeDeleteFile(Table table, OutputFile out,
>>>> List<Record> deletes, Schema deleteRowSchema) throws IOException {
>>>> EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
>>>> .forTable(table)
>>>> .withPartition(Row.of("20201221"))
>>>> .rowSchema(deleteRowSchema)
>>>> .createWriterFunc(GenericParquetWriter::buildWriter)
>>>> .overwrite()
>>>> .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
>>>> .buildEqualityWriter();
>>>>
>>>> try (Closeable toClose = writer) {
>>>> writer.deleteAll(deletes);
>>>> }
>>>>
>>>> return writer.toDeleteFile();
>>>> }
>>>>
>>>> liubo07199
>>>> liubo07199@hellobike.com
>>>>
>>>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=liubo07199&uid=liubo07199%40hellobike.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22liubo07199%40hellobike.com%22%5D>
>>>>
>>>
>
> --
> Chen Song
>
>