You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/11/02 22:07:00 UTC

[jira] [Updated] (FLINK-29827) [Connector][AsyncSinkWriter] Checkpointed states block writer from sending records

     [ https://issues.apache.org/jira/browse/FLINK-29827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ASF GitHub Bot updated FLINK-29827:
-----------------------------------
    Labels: pull-request-available  (was: )

> [Connector][AsyncSinkWriter] Checkpointed states block writer from sending records
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-29827
>                 URL: https://issues.apache.org/jira/browse/FLINK-29827
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.15.2
>            Reporter: Hoang Tri Tam
>            Assignee: Ahmed Hamdy
>            Priority: Major
>              Labels: pull-request-available
>
> Hi every one,
> Recently we discovered an issue which blocks Sink operators from sending records to client's endpoint.
> To *reproduce* the issue, we started our Flink app from an existing savepoint, in which some Sink operators hold some buffered records. For instance, app employs KinesisStreamSink with a parallelism of 4. 2 of them has no buffered records, the other 2 start with existing states of some records, which are leftover from the previous run. 
> {*}Behavior{*}: during runtime, we sent records (let's say 200) to this sink in rebalance mode. But only 100 of them (50%) were dispatched from the sink operators.
> After {*}investigation{*}, we found that the implementation AsyncSinkWriter invokes submitRequestEntries() to send the records to their destination. This invocation is performed when a callback is performed, a flush(true) or forced-flush is called, or when the buffered is full (either in size or in quantity).
> The case falls in the first scenario: the _callback is not registered_ {_}when the writer starts with some existing buffered records{_}, initialized from savepoint. Hence in our case, those operators were holding records till their buffers become full, while other operators still perform the usual sending.
> Impacted {*}scope{*}: flink-1.15.2 or later version, for any Sink that implements AsyncSinkWriter.
> We currently treat this as an abnormal behavior of Flink, but please let me know if this behavior is intended by design.
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)