You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Danny Cranmer (Jira)" <ji...@apache.org> on 2022/11/01 11:22:00 UTC

[jira] [Commented] (FLINK-29827) [Connector][AsyncSinkWriter] Checkpointed states block writer from sending records

    [ https://issues.apache.org/jira/browse/FLINK-29827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17627112#comment-17627112 ] 

Danny Cranmer commented on FLINK-29827:
---------------------------------------

[~mc8max] Thanks for reporting this issue. I agree it sounds like a bug, I will follow up.

> [Connector][AsyncSinkWriter] Checkpointed states block writer from sending records
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-29827
>                 URL: https://issues.apache.org/jira/browse/FLINK-29827
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.15.2
>            Reporter: Hoang Tri Tam
>            Priority: Major
>
> Hi every one,
> Recently we discovered an issue which blocks Sink operators from sending records to client's endpoint.
> To *reproduce* the issue, we started our Flink app from an existing savepoint, in which some Sink operators hold some buffered records. For instance, app employs KinesisStreamSink with a parallelism of 4. 2 of them has no buffered records, the other 2 start with existing states of some records, which are leftover from the previous run. 
> {*}Behavior{*}: during runtime, we sent records (let's say 200) to this sink in rebalance mode. But only 100 of them (50%) were dispatched from the sink operators.
> After {*}investigation{*}, we found that the implementation AsyncSinkWriter invokes submitRequestEntries() to send the records to their destination. This invocation is performed when a callback is performed, a flush(true) or forced-flush is called, or when the buffered is full (either in size or in quantity).
> The case falls in the first scenario: the _callback is not registered_ {_}when the writer starts with some existing buffered records{_}, initialized from savepoint. Hence in our case, those operators were holding records till their buffers become full, while other operators still perform the usual sending.
> Impacted {*}scope{*}: flink-1.15.2 or later version, for any Sink that implements AsyncSinkWriter.
> We currently treat this as an abnormal behavior of Flink, but please let me know if this behavior is intended by design.
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)