You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zlzhang0122 (Jira)" <ji...@apache.org> on 2022/01/12 11:56:00 UTC

[jira] [Comment Edited] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

    [ https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474472#comment-17474472 ] 

zlzhang0122 edited comment on FLINK-24207 at 1/12/22, 11:55 AM:
----------------------------------------------------------------

[~roman]  Sorry for the late reply. In our use case, although the third party component can't support transaction or idempotent, we also want to ensure end-to-end exactly-once using 2PC, so we using ListState to cache the data. When checkpointing, we will snapshot the ListState, and when the CheckpointCoordinator notifyCheckpointComplete, we will commit the ListState to the third party component and delete it from the state. Of caurse, the commit or delete may failed, we use retry or fail the whole job to deal with this situation, for the commit of same batch data, the third party component can ensure the idempotent.


was (Author: zlzhang0122):
[~roman]  Sorry for the late reply. In our use case, although the third party component can't support transaction or idempotent, we also want to ensure end-to-end exactly-once using 2PC, so we using ListState to cache the data. When checkpointing, we will snapshot the ListState, and when the CheckpointCoordinator notifyCheckpointComplete, we will commit the ListState to the third party downstream and delete it from the state. Of caurse, the commit or delete may failed, we use retry or fail the whole job to deal with this situation, for the commit of same batch data, the third party component can ensure the idempotent.

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> -------------------------------------------------------
>
>                 Key: FLINK-24207
>                 URL: https://issues.apache.org/jira/browse/FLINK-24207
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>    Affects Versions: 1.12.2, 1.13.1
>            Reporter: zlzhang0122
>            Priority: Major
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator state, but operator state will do a deep copy when taking checkpoint, so large operator state may produce a OOM error. Add support of KeyedState in TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and give users more convenience.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)