You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Singh Aulakh, Karanpreet KP" <ka...@amazon.com.INVALID> on 2020/06/16 00:54:07 UTC

Does FlinkKinesisConsumer not retry on NoHttpResponseException?

Hello!

(Apache Flink1.8 on AWS EMR release label 5.28.x)

Our data source is an AWS Kinesis stream (with 450 shards if that matters). We use the FlinkKinesisConsumer to read the kinesis stream. Our application occasionally (once every couple of days) crashes with a "Target server failed to respond" error. The full stack trace is at the bottom.

Looking more into the codebase I found out that 'ProvisionedThroughputExceededException' are the only exception types that are retried on. Code<https://github.com/apache/flink/blob/2b0a8ceeb131c938d2e41dfee66099bfa5f366ae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L365>
1. Wondering why a transient http response exception is not retried by the kinesis connector?
2. Is there a way I can pass in a retry configuration that will retry on these errors?

As a side note, we set the following retry configuration -

env.setRestartStrategy(RestartStrategies.failureRateRestart(12,

      org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),

                org.apache.flink.api.common.time.Time.of(300, TimeUnit.SECONDS)));

Full stack trace of the exception -

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)

    at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)

    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)

    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

(Shamelessly copy pasting from the stack overflow question I posted https://stackoverflow.com/questions/62399248/flinkkinesisconsumer-does-not-retry-on-nohttpresponseexception )

--
KP

Re: Does FlinkKinesisConsumer not retry on NoHttpResponseException?

Posted by Robert Metzger <rm...@apache.org>.
For the others on the dev@ list: I responded on SO.

On Tue, Jun 16, 2020 at 7:56 AM Singh Aulakh, Karanpreet KP
<ka...@amazon.com.invalid> wrote:

> Hello!
>
> (Apache Flink1.8 on AWS EMR release label 5.28.x)
>
> Our data source is an AWS Kinesis stream (with 450 shards if that
> matters). We use the FlinkKinesisConsumer to read the kinesis stream. Our
> application occasionally (once every couple of days) crashes with a "Target
> server failed to respond" error. The full stack trace is at the bottom.
>
> Looking more into the codebase I found out that
> 'ProvisionedThroughputExceededException' are the only exception types that
> are retried on. Code<
> https://github.com/apache/flink/blob/2b0a8ceeb131c938d2e41dfee66099bfa5f366ae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L365
> >
> 1. Wondering why a transient http response exception is not retried by the
> kinesis connector?
> 2. Is there a way I can pass in a retry configuration that will retry on
> these errors?
>
> As a side note, we set the following retry configuration -
>
> env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
>
>       org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
>
>                 org.apache.flink.api.common.time.Time.of(300,
> TimeUnit.SECONDS)));
>
> Full stack trace of the exception -
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
>
>     at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
>
>     at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
>
>     at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
>
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>     at java.lang.Thread.run(Thread.java:748)
>
> (Shamelessly copy pasting from the stack overflow question I posted
> https://stackoverflow.com/questions/62399248/flinkkinesisconsumer-does-not-retry-on-nohttpresponseexception
> )
>
> --
> KP
>