You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "John Karp (Jira)" <ji...@apache.org> on 2022/03/15 15:23:00 UTC

[jira] [Created] (FLINK-26657) Resilient Kinesis consumption

John Karp created FLINK-26657:
---------------------------------

             Summary: Resilient Kinesis consumption
                 Key: FLINK-26657
                 URL: https://issues.apache.org/jira/browse/FLINK-26657
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kinesis
            Reporter: John Karp


Currently, any sort of error reading from a flink stream will quickly result in a job-killing error. If the error is not 'recoverable', failure will be instant, or if it is 'recoverable', there will be a fixed number of retries before the job fails -- and for some operations such as GetRecords, the retries can be exhausted in just a few seconds. Furthermore, KinesisProxy.isRecoverableSdkClientException() and KinesisProxy.isRecoverableException() only recognize very narrow categories of errors as even being recoverable.

So for example if a Flink job is aggregating Kinesis streams from multiple regions, the Flink job will not be able to make any forward progress on processing data from any region if there is a single-region outage, since the job will likely fail before any checkpoint can be completed. For some use cases, it is better to proceed with processing most of the data, than to wait indefinitely for the problematic region to recover.

One mitigation is to increase all of the ConsumerConfig timeouts to be very high. However, this will only affect error handling for 'recoverable' exceptions, and depending on the nature of the regional failure, the resulting errors may not be classified as 'recoverable'.

Proposed mitigation: add a 'soft failure' mode to the Kinesis consumer, where most errors arising from Kinesis operations are considered recoverable, and there are unlimited retries. (Except for perhaps EFO de-registration, which I'm assuming needs to complete in a timely fashion. Also, it looks like ExpiredIteratorException needs to bubble up to PollingRecordPublisher.getRecords() without retries.)




--
This message was sent by Atlassian Jira
(v8.20.1#820001)