You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Igor Basov <mr...@gmail.com> on 2021/06/08 16:29:20 UTC
FlinkSink UIDs problem
Hello,
I'm using Flink 1.11 with Iceberg 0.11.
I use `pipeline.auto-generate-uids: false` in my Flink configuration to
enforce assigning UIDs to operators, so that the job could be safely
stopped and the state restored from the latest checkpoint.
But when I use Iceberg FlinkSink I get error:
Caused by: org.apache.flink.client.program.ProgramInvocationException:
The main method caused an error: Auto generated UIDs have been
disabled but no UID or hash has been assigned to operator
IcebergStreamWriter
I believe the problem is in this piece of code inside FlinkSink.java where
both transforms don't have UIDs assigned.
DataStream<Void> returnStream = rowDataInput
.transform(ICEBERG_STREAM_WRITER_NAME,
TypeInformation.of(WriteResult.class), streamWriter)
.setParallelism(writeParallelism)
.transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
.setParallelism(1)
.setMaxParallelism(1);
If it's the case, is there a workaround for this issue?
Re: FlinkSink UIDs problem
Posted by Steven Wu <st...@gmail.com>.
Hi Igor,
I created the PR: https://github.com/apache/iceberg/pull/2745
please take a look if you can
Thanks,
Steven
On Tue, Jun 8, 2021 at 8:24 PM Steven Wu <st...@gmail.com> wrote:
> Igor,
>
> I think your diagnosis is spot on.
>
> Regarding the workaround, I guess there are two ways
> - pipeline.auto-generate-uids=true, which is probably not what you are
> looking for
> - avoid FlinkSink builder and write your own glue code
>
> As for the fix, we can probably add a `uid` method to the FlinkSink
> builder. FlinkSink always sets the uid for three operators as
> "uid-writer", "uid-committer", "uid-sink". if "uid '' is not provided, it
> is default to the table name.
>
> Thanks,
> Steven
>
>
>
>
>
> On Tue, Jun 8, 2021 at 10:02 AM Igor Basov <mr...@gmail.com> wrote:
>
>> Hello,
>> I'm using Flink 1.11 with Iceberg 0.11.
>> I use `pipeline.auto-generate-uids: false` in my Flink configuration to
>> enforce assigning UIDs to operators, so that the job could be safely
>> stopped and the state restored from the latest checkpoint.
>> But when I use Iceberg FlinkSink I get error:
>>
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Auto generated UIDs have been disabled but no UID or hash has been assigned to operator IcebergStreamWriter
>>
>> I believe the problem is in this piece of code inside FlinkSink.java
>> where both transforms don't have UIDs assigned.
>>
>> DataStream<Void> returnStream = rowDataInput
>> .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
>> .setParallelism(writeParallelism)
>> .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
>> .setParallelism(1)
>> .setMaxParallelism(1);
>>
>> If it's the case, is there a workaround for this issue?
>>
>
Re: FlinkSink UIDs problem
Posted by Steven Wu <st...@gmail.com>.
Igor,
I think your diagnosis is spot on.
Regarding the workaround, I guess there are two ways
- pipeline.auto-generate-uids=true, which is probably not what you are
looking for
- avoid FlinkSink builder and write your own glue code
As for the fix, we can probably add a `uid` method to the FlinkSink
builder. FlinkSink always sets the uid for three operators as "uid-writer",
"uid-committer", "uid-sink". if "uid '' is not provided, it is default to
the table name.
Thanks,
Steven
On Tue, Jun 8, 2021 at 10:02 AM Igor Basov <mr...@gmail.com> wrote:
> Hello,
> I'm using Flink 1.11 with Iceberg 0.11.
> I use `pipeline.auto-generate-uids: false` in my Flink configuration to
> enforce assigning UIDs to operators, so that the job could be safely
> stopped and the state restored from the latest checkpoint.
> But when I use Iceberg FlinkSink I get error:
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Auto generated UIDs have been disabled but no UID or hash has been assigned to operator IcebergStreamWriter
>
> I believe the problem is in this piece of code inside FlinkSink.java where
> both transforms don't have UIDs assigned.
>
> DataStream<Void> returnStream = rowDataInput
> .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
> .setParallelism(writeParallelism)
> .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
> .setParallelism(1)
> .setMaxParallelism(1);
>
> If it's the case, is there a workaround for this issue?
>