You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Krzysztof Chmielewski (Jira)" <ji...@apache.org> on 2022/10/11 20:44:00 UTC

[jira] [Updated] (FLINK-29589) Data Loss in Sink GlobalCommitter during Task Manager recovery

     [ https://issues.apache.org/jira/browse/FLINK-29589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Krzysztof Chmielewski updated FLINK-29589:
------------------------------------------
    Description: 
Flink's Sink's architecture with global committer seems to be vulnerable for data loss during Task Manager recovery. The entire checkpoint can be lost by GlobalCommitter resulting with data loss for sinks.

Issue was observed in Delta Sink connector on a real 1.14.x cluster and was replicated using Flink's 1.14.6 Test Utils classes.

Scenario:
 #  Streaming source emitting constant number of events per checkpoint (20 events for 5 commits)
 #  Sink with parallelism > 1 with committer and GlobalCommitter elements.
 #  Commitaers processed committables for checkpointId 2.
 #  GlobalCommitter throws exception (desired exception) during checkpointId 2 (third commit) while processing data from checkpoint 1 (it is expected to global committer architecture lag one commit behind in reference to rest of the pipeline).
 # Streaming source ends.
 #  we are missing 20 records (one checkpoint).

What is happening is that during recovery, committers are performing "retry" on committables for checkpointId 2, however those committables, reprocessed from "retry" task are not emit to the global committer. 

The issue can be reproduced using Junit Test builded with Flink's TestSink.
The test was implemented [here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery].

I believe that problem is somewhere around `SinkOperator::notifyCheckpointComplete` method. In there we see that Retry async task is scheduled however its result is never emitted downstream like it is done for regular flow one line above.

  was:
Flink's Sink's architecture with global committer seems to be vulnerable for data loss during Task Manager recovery. The entire checkpoint can be lost by GlobalCommitter resulting with data loss for sinks.

Issue was observed in Delta Sink connector on a real 1.14.x cluster and was replicated using Flink's 1.14.6 Test Utils classes.

Scenario:
1. Streaming source emitting constant number of events per checkpoint (20 events for 5 commits)
2. Sink with parallelism > 1 with committer and GlobalCommitter elements.
3. Commitaers processed committables for checkpointId 2.
3. GlobalCommitter throws exception (desired exception) during checkpointId 2 (third commit) while processing data from checkpoint 1 (it is expected to global committer architecture lag one commit behind in reference to rest of the pipeline).
4. Streaming source ends
5. we are missing 20 records (one checkpoint).

What is happening is that during recovery, committers are performing "retry" on committables for checkpointId 2, however those committables, reprocessed from "retry" task are not emit to the global committer. 

The issue can be reproduced using Junit Test builded with Flink's TestSink.
The test was implemented [here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery].



I believe that problem is somewhere around `SinkOperator::notifyCheckpointComplete` method. In there we see that Retry async task is scheduled however its result is never emitted downstream like it is done for regular flow one line above.


> Data Loss in Sink GlobalCommitter during Task Manager recovery
> --------------------------------------------------------------
>
>                 Key: FLINK-29589
>                 URL: https://issues.apache.org/jira/browse/FLINK-29589
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.14.5, 1.14.6
>            Reporter: Krzysztof Chmielewski
>            Priority: Major
>
> Flink's Sink's architecture with global committer seems to be vulnerable for data loss during Task Manager recovery. The entire checkpoint can be lost by GlobalCommitter resulting with data loss for sinks.
> Issue was observed in Delta Sink connector on a real 1.14.x cluster and was replicated using Flink's 1.14.6 Test Utils classes.
> Scenario:
>  #  Streaming source emitting constant number of events per checkpoint (20 events for 5 commits)
>  #  Sink with parallelism > 1 with committer and GlobalCommitter elements.
>  #  Commitaers processed committables for checkpointId 2.
>  #  GlobalCommitter throws exception (desired exception) during checkpointId 2 (third commit) while processing data from checkpoint 1 (it is expected to global committer architecture lag one commit behind in reference to rest of the pipeline).
>  # Streaming source ends.
>  #  we are missing 20 records (one checkpoint).
> What is happening is that during recovery, committers are performing "retry" on committables for checkpointId 2, however those committables, reprocessed from "retry" task are not emit to the global committer. 
> The issue can be reproduced using Junit Test builded with Flink's TestSink.
> The test was implemented [here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery].
> I believe that problem is somewhere around `SinkOperator::notifyCheckpointComplete` method. In there we see that Retry async task is scheduled however its result is never emitted downstream like it is done for regular flow one line above.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)