You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Krzysztof Chmielewski <kr...@gmail.com> on 2022/09/07 10:18:17 UTC

Migrating Sink v1 to v2

Hi,
I'm a co-author for opensource Delta-Flink connector hosted on [1].
The connector was originated for Flink 1.12 and currently we migrated to
1.14.
Both sink and source are using new Unified API from Flink 1.12.

I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
deprecated.
After the migration, one of our integration test for Sink started to fail
for cluster failover scenario [2]
The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3]
but since we use Junit5, we do not extend this Flink's class.

For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our V1
Sink instance.

The test fails in one of the two ways:

Caused by: java.lang.NullPointerException: Unknown subtask for 1
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
at
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
at
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
at
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
at
org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)


OR

Caused by: java.lang.UnsupportedOperationException: Currently it is not
supported to update the CommittableSummary for a checkpoint coming from the
same subtask. Please check the status of FLINK-25920
at
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
at
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
at
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
at
org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)

Test is passing for Flink 1.12, 1.13 and 1.14.

I would like to ask for any suggestions, what might be causing this.

Thanks,
Krzysztof Chmielewski


[1] https://github.com/delta-io/connectors/tree/master/flink
[2]
https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
[3]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java

Re: Migrating Sink v1 to v2

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi Krzysztof,
I'll continue tracking the issue and hopefully it
could be fixed before the next minor releases for
1.15.x, 1.16.x and 1.17. 
Best,
Yun Gao
------------------------------------------------------------------
From:Krzysztof Chmielewski <kr...@gmail.com>
Send Time:2022 Sep. 28 (Wed.) 23:46
To:dev <de...@flink.apache.org>; Yun Gao <yu...@aliyun.com>
Subject:Re: Migrating Sink v1 to v2
Yun Gao,
Thank you very much for this.
Do you see this ticked be picked up any time soon?
For a moment I was thinking about trying to take my change with it but I guessing it would be rather hard without deeper understanding of Flink's internals, plus its marked as "major"
Regards,
Krzysztof Chmielewski
śr., 28 wrz 2022 o 17:22 Yun Gao <yu...@aliyun.com.invalid> napisał(a):
Hi Krzysztof,
 Very sorry for the long delay for it takes a bit of time to have a full investigation,
 the issue should be caused by the implementation bugs and I have filed an issue
 for the bugs. 
 For the following actions, let's move to the thread of [2] for global synchronization.
 Very sorry for the inconvenience brought.
 Best,
 Yun Gao
 [1] https://issues.apache.org/jira/browse/FLINK-29459 <https://issues.apache.org/jira/browse/FLINK-29459 > <https://issues.apache.org/jira/browse/FLINK-29459 <https://issues.apache.org/jira/browse/FLINK-29459 > >
 [2] https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 <https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 > <https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 <https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 > >
 ------------------------------------------------------------------
 From:Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >>
 Send Time:2022 Sep. 10 (Sat.) 04:04
 To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>
 Subject:Re: Migrating Sink v1 to v2
 It seems that there might be an issue with state recovery or some kind
 concurrency issue for GlobalCommitterOperator created by SinkV1Adapter
 that uses StandardSinkTopologies.addGlobalCommitter method in scenarios
 with cluster failover.
 I've tried to locate the place where the issue is and from what I can tell
 is that for setup with two writers, SubtaskCommittableManager from both
 writer tasks are registered in CheckpointCommittableManagerImpl using
 upsertSummary method.
 Then, CommittableWithLineage objects are processed in
 CheckpointCommittableManagerImpl.addCommittable by
 SubtaskCommittableManager corresponding to subtaskId taken from
 CommittableWithLineage.getSubtaskId().
 However with failover scenario (Task Manager crush), where for example
 there is an exception thrown in source, CheckpointCommittableManagerImpl
 is recreated without any SubtaskCommittableManager although that could be
 actually ok. What is more important is that during recovery,
 CheckpointCommittableManagerImpl.addCommittable is called twice but
 for CommittableSummary with the same subtaskID, for example 0 instead 0 and
 1 like it was done before designed TM crush.
 The later CommittableSummary fails with exception pointing to FLINK-25920.
 After that or sometimes before the second CommittableSummary arrived,
 CheckpointCommittableManagerImpl.addCommittable is called for subtaskId 1
 that is not registered in CheckpointCommittableManagerImpl. That causes
 NPE.
 My streaming environment has the following configuration:
 Configuration config = new Configuration();
 config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
 env.configure(config, getClass().getClassLoader());
 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
 For now, I don't have any other, more compact reproducer than testFileSink
 test from [1].
 [1]
https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > >
 śr., 7 wrz 2022 o 13:22 Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >> napisał(a):
 > A small update,
 > When I change number of Sinks from 3 to 1, test passes.
 >
 > śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski <
 > krzysiek.chmielewski@gmail.com <mailto:krzysiek.chmielewski@gmail.com >> napisał(a):
 >
 >> Hi,
 >> I'm a co-author for opensource Delta-Flink connector hosted on [1].
 >> The connector was originated for Flink 1.12 and currently we migrated to
 >> 1.14.
 >> Both sink and source are using new Unified API from Flink 1.12.
 >>
 >> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
 >> deprecated.
 >> After the migration, one of our integration test for Sink started to fail
 >> for cluster failover scenario [2]
 >> The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3]
 >> but since we use Junit5, we do not extend this Flink's class.
 >>
 >> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our
 >> V1 Sink instance.
 >>
 >> The test fails in one of the two ways:
 >>
 >> Caused by: java.lang.NullPointerException: Unknown subtask for 1
 >> at
 >> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
 >> at
 >> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io >.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io >.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
 >> at
 >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 >> at
 >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
 >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
 >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
 >> at java.lang.Thread.run(Thread.java:748)
 >>
 >>
 >> OR
 >>
 >> Caused by: java.lang.UnsupportedOperationException: Currently it is not
 >> supported to update the CommittableSummary for a checkpoint coming from the
 >> same subtask. Please check the status of FLINK-25920
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
 >> at
 >> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io >.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io >.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
 >> at
 >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 >> at
 >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
 >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
 >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
 >> at java.lang.Thread.run(Thread.java:748)
 >>
 >> Test is passing for Flink 1.12, 1.13 and 1.14.
 >>
 >> I would like to ask for any suggestions, what might be causing this.
 >>
 >> Thanks,
 >> Krzysztof Chmielewski
 >>
 >>
 >> [1] https://github.com/delta-io/connectors/tree/master/flink <https://github.com/delta-io/connectors/tree/master/flink > <https://github.com/delta-io/connectors/tree/master/flink <https://github.com/delta-io/connectors/tree/master/flink > >
 >> [2]
 >> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > >
 >> [3]
 >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java > <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java > >
 >>
 >

Re: Migrating Sink v1 to v2

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Yun Gao,
Thank you very much for this.

Do you see this ticked be picked up any time soon?
For a moment I was thinking about trying to take my change with it but I
guessing it would be rather hard without deeper understanding of Flink's
internals, plus its marked as "major"

Regards,
Krzysztof Chmielewski

śr., 28 wrz 2022 o 17:22 Yun Gao <yu...@aliyun.com.invalid> napisał(a):

> Hi Krzysztof,
> Very sorry for the long delay for it takes a bit of time to have a full
> investigation,
> the issue should be caused by the implementation bugs and I have filed an
> issue
> for the bugs.
> For the following actions, let's move to the thread of [2] for global
> synchronization.
> Very sorry for the inconvenience brought.
> Best,
> Yun Gao
> [1] https://issues.apache.org/jira/browse/FLINK-29459 <
> https://issues.apache.org/jira/browse/FLINK-29459 >
> [2] https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 <
> https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 >
> ------------------------------------------------------------------
> From:Krzysztof Chmielewski <kr...@gmail.com>
> Send Time:2022 Sep. 10 (Sat.) 04:04
> To:dev <de...@flink.apache.org>
> Subject:Re: Migrating Sink v1 to v2
> It seems that there might be an issue with state recovery or some kind
> concurrency issue for GlobalCommitterOperator created by SinkV1Adapter
> that uses StandardSinkTopologies.addGlobalCommitter method in scenarios
> with cluster failover.
> I've tried to locate the place where the issue is and from what I can tell
> is that for setup with two writers, SubtaskCommittableManager from both
> writer tasks are registered in CheckpointCommittableManagerImpl using
> upsertSummary method.
> Then, CommittableWithLineage objects are processed in
> CheckpointCommittableManagerImpl.addCommittable by
> SubtaskCommittableManager corresponding to subtaskId taken from
> CommittableWithLineage.getSubtaskId().
> However with failover scenario (Task Manager crush), where for example
> there is an exception thrown in source, CheckpointCommittableManagerImpl
> is recreated without any SubtaskCommittableManager although that could be
> actually ok. What is more important is that during recovery,
> CheckpointCommittableManagerImpl.addCommittable is called twice but
> for CommittableSummary with the same subtaskID, for example 0 instead 0 and
> 1 like it was done before designed TM crush.
> The later CommittableSummary fails with exception pointing to FLINK-25920.
>  After that or sometimes before the second CommittableSummary arrived,
> CheckpointCommittableManagerImpl.addCommittable is called for subtaskId 1
> that is not registered in CheckpointCommittableManagerImpl. That causes
> NPE.
> My streaming environment has the following configuration:
> Configuration config = new Configuration();
> config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
> env.configure(config, getClass().getClassLoader());
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> For now, I don't have any other, more compact reproducer than testFileSink
> test from [1].
> [1]
>
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> <
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> >
> śr., 7 wrz 2022 o 13:22 Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com> napisał(a):
> > A small update,
> > When I change number of Sinks from 3 to 1, test passes.
> >
> > śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski <
> > krzysiek.chmielewski@gmail.com> napisał(a):
> >
> >> Hi,
> >> I'm a co-author for opensource Delta-Flink connector hosted on [1].
> >> The connector was originated for Flink 1.12 and currently we migrated to
> >> 1.14.
> >> Both sink and source are using new Unified API from Flink 1.12.
> >>
> >> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
> >> deprecated.
> >> After the migration, one of our integration test for Sink started to
> fail
> >> for cluster failover scenario [2]
> >> The test is heavily based on Flink's StreamingExecutionFileSinkITCase
> [3]
> >> but since we use Junit5, we do not extend this Flink's class.
> >>
> >> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our
> >> V1 Sink instance.
> >>
> >> The test fails in one of the two ways:
> >>
> >> Caused by: java.lang.NullPointerException: Unknown subtask for 1
> >> at
> >> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
> >> at
> >>
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> >> at
> >>
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> >> at
> >>
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >>
> >> OR
> >>
> >> Caused by: java.lang.UnsupportedOperationException: Currently it is not
> >> supported to update the CommittableSummary for a checkpoint coming from
> the
> >> same subtask. Please check the status of FLINK-25920
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
> >> at
> >>
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
> >> at
> >>
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> >> at
> >> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> >> at
> >>
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> >> at
> >>
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> Test is passing for Flink 1.12, 1.13 and 1.14.
> >>
> >> I would like to ask for any suggestions, what might be causing this.
> >>
> >> Thanks,
> >> Krzysztof Chmielewski
> >>
> >>
> >> [1] https://github.com/delta-io/connectors/tree/master/flink <
> https://github.com/delta-io/connectors/tree/master/flink >
> >> [2]
> >>
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> <
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> >
> >> [3]
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
> <
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
> >
> >>
> >
>

Re: Migrating Sink v1 to v2

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi Krzysztof,
Very sorry for the long delay for it takes a bit of time to have a full investigation,
the issue should be caused by the implementation bugs and I have filed an issue
for the bugs. 
For the following actions, let's move to the thread of [2] for global synchronization.
Very sorry for the inconvenience brought.
Best,
Yun Gao
[1] https://issues.apache.org/jira/browse/FLINK-29459 <https://issues.apache.org/jira/browse/FLINK-29459 >
[2] https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 <https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 >
------------------------------------------------------------------
From:Krzysztof Chmielewski <kr...@gmail.com>
Send Time:2022 Sep. 10 (Sat.) 04:04
To:dev <de...@flink.apache.org>
Subject:Re: Migrating Sink v1 to v2
It seems that there might be an issue with state recovery or some kind
concurrency issue for GlobalCommitterOperator created by SinkV1Adapter
that uses StandardSinkTopologies.addGlobalCommitter method in scenarios
with cluster failover.
I've tried to locate the place where the issue is and from what I can tell
is that for setup with two writers, SubtaskCommittableManager from both
writer tasks are registered in CheckpointCommittableManagerImpl using
upsertSummary method.
Then, CommittableWithLineage objects are processed in
CheckpointCommittableManagerImpl.addCommittable by
SubtaskCommittableManager corresponding to subtaskId taken from
CommittableWithLineage.getSubtaskId().
However with failover scenario (Task Manager crush), where for example
there is an exception thrown in source, CheckpointCommittableManagerImpl
is recreated without any SubtaskCommittableManager although that could be
actually ok. What is more important is that during recovery,
CheckpointCommittableManagerImpl.addCommittable is called twice but
for CommittableSummary with the same subtaskID, for example 0 instead 0 and
1 like it was done before designed TM crush.
The later CommittableSummary fails with exception pointing to FLINK-25920.
 After that or sometimes before the second CommittableSummary arrived,
CheckpointCommittableManagerImpl.addCommittable is called for subtaskId 1
that is not registered in CheckpointCommittableManagerImpl. That causes
NPE.
My streaming environment has the following configuration:
Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
env.configure(config, getClass().getClassLoader());
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
For now, I don't have any other, more compact reproducer than testFileSink
test from [1].
[1]
https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java >
śr., 7 wrz 2022 o 13:22 Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com> napisał(a):
> A small update,
> When I change number of Sinks from 3 to 1, test passes.
>
> śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com> napisał(a):
>
>> Hi,
>> I'm a co-author for opensource Delta-Flink connector hosted on [1].
>> The connector was originated for Flink 1.12 and currently we migrated to
>> 1.14.
>> Both sink and source are using new Unified API from Flink 1.12.
>>
>> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
>> deprecated.
>> After the migration, one of our integration test for Sink started to fail
>> for cluster failover scenario [2]
>> The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3]
>> but since we use Junit5, we do not extend this Flink's class.
>>
>> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our
>> V1 Sink instance.
>>
>> The test fails in one of the two ways:
>>
>> Caused by: java.lang.NullPointerException: Unknown subtask for 1
>> at
>> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
>> at
>> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> OR
>>
>> Caused by: java.lang.UnsupportedOperationException: Currently it is not
>> supported to update the CommittableSummary for a checkpoint coming from the
>> same subtask. Please check the status of FLINK-25920
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
>> at
>> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Test is passing for Flink 1.12, 1.13 and 1.14.
>>
>> I would like to ask for any suggestions, what might be causing this.
>>
>> Thanks,
>> Krzysztof Chmielewski
>>
>>
>> [1] https://github.com/delta-io/connectors/tree/master/flink <https://github.com/delta-io/connectors/tree/master/flink >
>> [2]
>> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java >
>> [3]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java >
>>
>

Re: Migrating Sink v1 to v2

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
It seems that there might be an issue with state recovery or some kind
concurrency issue for GlobalCommitterOperator created by  SinkV1Adapter
that uses  StandardSinkTopologies.addGlobalCommitter method in scenarios
with cluster failover.

I've tried to locate the place where the issue is and from what I can tell
is that for setup with two writers, SubtaskCommittableManager from both
writer tasks are registered in CheckpointCommittableManagerImpl using
upsertSummary method.

Then, CommittableWithLineage objects are processed in
CheckpointCommittableManagerImpl.addCommittable by
SubtaskCommittableManager corresponding to subtaskId taken from
CommittableWithLineage.getSubtaskId().

However with failover scenario (Task Manager crush), where for example
there is an exception thrown in source,  CheckpointCommittableManagerImpl
is recreated without any  SubtaskCommittableManager although that could be
actually ok. What is more important is that during recovery,
CheckpointCommittableManagerImpl.addCommittable is called twice but
for CommittableSummary with the same subtaskID, for example 0 instead 0 and
1 like it was done before designed TM crush.
The later CommittableSummary fails with exception pointing to FLINK-25920.

 After that or sometimes before the second CommittableSummary arrived,
CheckpointCommittableManagerImpl.addCommittable is called for subtaskId 1
that is not registered in  CheckpointCommittableManagerImpl. That causes
NPE.

My streaming environment has the following configuration:
Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
env.configure(config, getClass().getClassLoader());
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

For now, I don't have any other, more compact reproducer than testFileSink
test from [1].

[1]
https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java



śr., 7 wrz 2022 o 13:22 Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com> napisał(a):

> A small update,
> When I change number of Sinks from 3 to 1, test passes.
>
> śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski <
> krzysiek.chmielewski@gmail.com> napisał(a):
>
>> Hi,
>> I'm a co-author for opensource Delta-Flink connector hosted on [1].
>> The connector was originated for Flink 1.12 and currently we migrated to
>> 1.14.
>> Both sink and source are using new Unified API from Flink 1.12.
>>
>> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
>> deprecated.
>> After the migration, one of our integration test for Sink started to fail
>> for cluster failover scenario [2]
>> The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3]
>> but since we use Junit5, we do not extend this Flink's class.
>>
>> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our
>> V1 Sink instance.
>>
>> The test fails in one of the two ways:
>>
>> Caused by: java.lang.NullPointerException: Unknown subtask for 1
>> at
>> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
>> at
>> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> OR
>>
>> Caused by: java.lang.UnsupportedOperationException: Currently it is not
>> supported to update the CommittableSummary for a checkpoint coming from the
>> same subtask. Please check the status of FLINK-25920
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
>> at
>> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Test is passing for Flink 1.12, 1.13 and 1.14.
>>
>> I would like to ask for any suggestions, what might be causing this.
>>
>> Thanks,
>> Krzysztof Chmielewski
>>
>>
>> [1] https://github.com/delta-io/connectors/tree/master/flink
>> [2]
>> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
>> [3]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
>>
>

Re: Migrating Sink v1 to v2

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
A small update,
When I change number of Sinks from 3 to 1, test passes.

śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com> napisał(a):

> Hi,
> I'm a co-author for opensource Delta-Flink connector hosted on [1].
> The connector was originated for Flink 1.12 and currently we migrated to
> 1.14.
> Both sink and source are using new Unified API from Flink 1.12.
>
> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
> deprecated.
> After the migration, one of our integration test for Sink started to fail
> for cluster failover scenario [2]
> The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3]
> but since we use Junit5, we do not extend this Flink's class.
>
> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our V1
> Sink instance.
>
> The test fails in one of the two ways:
>
> Caused by: java.lang.NullPointerException: Unknown subtask for 1
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
> at
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
>
>
> OR
>
> Caused by: java.lang.UnsupportedOperationException: Currently it is not
> supported to update the CommittableSummary for a checkpoint coming from the
> same subtask. Please check the status of FLINK-25920
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
> at
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
> at
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
>
> Test is passing for Flink 1.12, 1.13 and 1.14.
>
> I would like to ask for any suggestions, what might be causing this.
>
> Thanks,
> Krzysztof Chmielewski
>
>
> [1] https://github.com/delta-io/connectors/tree/master/flink
> [2]
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
>