You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Robert Metzger <rm...@apache.org> on 2020/07/03 07:31:19 UTC

Re: Does FlinkKinesisConsumer not retry on NoHttpResponseException?

For the others on the dev@ list: I responded on SO.

On Tue, Jun 16, 2020 at 7:56 AM Singh Aulakh, Karanpreet KP
<ka...@amazon.com.invalid> wrote:

> Hello!
>
> (Apache Flink1.8 on AWS EMR release label 5.28.x)
>
> Our data source is an AWS Kinesis stream (with 450 shards if that
> matters). We use the FlinkKinesisConsumer to read the kinesis stream. Our
> application occasionally (once every couple of days) crashes with a "Target
> server failed to respond" error. The full stack trace is at the bottom.
>
> Looking more into the codebase I found out that
> 'ProvisionedThroughputExceededException' are the only exception types that
> are retried on. Code<
> https://github.com/apache/flink/blob/2b0a8ceeb131c938d2e41dfee66099bfa5f366ae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L365
> >
> 1. Wondering why a transient http response exception is not retried by the
> kinesis connector?
> 2. Is there a way I can pass in a retry configuration that will retry on
> these errors?
>
> As a side note, we set the following retry configuration -
>
> env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
>
>       org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
>
>                 org.apache.flink.api.common.time.Time.of(300,
> TimeUnit.SECONDS)));
>
> Full stack trace of the exception -
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
>
>     at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
>
>     at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
>
>     at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
>
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>     at java.lang.Thread.run(Thread.java:748)
>
> (Shamelessly copy pasting from the stack overflow question I posted
> https://stackoverflow.com/questions/62399248/flinkkinesisconsumer-does-not-retry-on-nohttpresponseexception
> )
>
> --
> KP
>