You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Hoang Tri Tam (Jira)" <ji...@apache.org> on 2022/11/01 07:46:00 UTC

[jira] [Created] (FLINK-29827) [Connector][AsyncSinkWriter] Checkpointed states block writer from sending records

Hoang Tri Tam created FLINK-29827:
-------------------------------------

             Summary: [Connector][AsyncSinkWriter] Checkpointed states block writer from sending records
                 Key: FLINK-29827
                 URL: https://issues.apache.org/jira/browse/FLINK-29827
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Common
    Affects Versions: 1.15.2
            Reporter: Hoang Tri Tam


Hi every one,

Recently we discovered an issue which blocks Sink operators from sending records to client's endpoint.

To *reproduce* the issue, we started our Flink app from an existing savepoint, in which some Sink operators hold some buffered records. For instance, app employs KinesisStreamSink with a parallelism of 4. 2 of them has no buffered records, the other 2 start with existing states of some records, which are leftover from the previous run. 

{*}Behavior{*}: during runtime, we sent records (let's say 200) to this sink in rebalance mode. But only 100 of them (50%) were dispatched from the sink operators.

After {*}investigation{*}, we found that the implementation AsyncSinkWriter invokes submitRequestEntries() to send the records to their destination. This invocation is performed when a callback is performed, a flush(true) or forced-flush is called, or when the buffered is full (either in size or in quantity).

The case falls in the first scenario: the _callback is not registered_ {_}when the writer starts with some existing buffered records{_}, initialized from savepoint. Hence in our case, those operators were holding records till their buffers become full, while other operators still perform the usual sending.

Impacted {*}scope{*}: flink-1.15.2 or later version, for any Sink that implements AsyncSinkWriter.

We currently treat this as an abnormal behavior of Flink, but please let me know if this behavior is intended by design.

Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)