You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jeff Henrikson <je...@gmail.com> on 2020/05/02 02:30:12 UTC

flink-s3-fs-hadoop retry configuration

Hello Flink users,

I could use help with three related questions:

1) How can I observe retries in the flink-s3-fs-hadoop connector?

2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
the hadoop configuration I have provided, as opposed to some separate
default configuration?  My job fails quickly when I read larger or more 
numerous objects from S3.  I conjecture the failure may be related to 
insufficient retries when S3 throttles.

3) What s3 fault recovery approach would you recommend?

Background:

I am having trouble with reliable operation of the flink-s3-fs-hadoop 
connector.   My application sources all its DataStream data from S3, and 
appears to get frequently throttled by s3:

     Caused by:
     org.apache.flink.streaming.runtime.tasks.AsynchronousException:
     Caught exception when processing split: [0]
     s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
     1586911084000 : 0 + 33554432
     . . .
     Caused by: java.io.InterruptedIOException: Failed to open
     s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
     s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
     com.amazonaws.SdkClientException: Unable to execute HTTP request:
     Timeout waiting for connection from pool

The s3 throttling does not seem to trigger retries and so
causes the job to fail.  For troubleshooting purposes, the job stays up
for much longer if I reduce s3 inputs to my job by disabling functionality.

I see in the documentation for hadoop-aws that there are properties
such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling retries
within hadoop.

After wrangling with some classpath troubles, I managed to get
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of hadoop
configuration files {core/hdfs/mapred/yarn}-site.xml.  I can confirm
that the cluster parses the configuration by passing invalid xml and
seeing the cluster crash.

The puzzle with which I am now faced is that the configuration for 
retries and timeouts in core-site.xml seems to have no effect on the
application.

I deploy in kubernetes with a custom docker image.  For now, I have
not enabled the zookeeper-based HA.

See below for a frequent stacktrace that I interpret as likely to be
caused by s3 throttling.

Thanks in advance for any help.

Regards,


Jeff Henrikson



     2020-04-30 19:35:24
     org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
         at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
         at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
         at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
         at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
         at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
         at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
         at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
         at 
jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
         at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
         at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
         at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
         at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
         at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
         at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
         at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
         at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
         at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
         at akka.actor.Actor.aroundReceive(Actor.scala:517)
         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
         at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
         at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
         at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
         at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
     Caused by: 
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught 
exception when processing split: [0] 
s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@ 1586911084000 
: 0 + 33554432
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
         at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
     Caused by: java.io.InterruptedIOException: Failed to open 
s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on 
s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv: 
com.amazonaws.SdkClientException: Unable to execute HTTP request: 
Timeout waiting for connection from pool
         at 
org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
         at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
         at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
         at 
org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:181)
         at 
org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:327)
         at 
org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
         at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
         at 
org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
         at 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
         at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
         at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
         at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
         at 
org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:320)
         at 
org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:428)
         at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
         at 
org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
         at 
org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:695)
         at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:483)
         at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315)
     Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP 
request: Timeout waiting for connection from pool
         at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
         at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
         at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
         at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
         at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
         at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
         at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
         at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
         at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
         at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
         at 
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
         at 
org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:182)
         at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
         ... 16 more
     Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: 
Timeout waiting for connection from pool
         at 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
         at 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
         at 
jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
         at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
         at 
com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
         at com.amazonaws.http.conn.$Proxy21.get(Unknown Source)
         at 
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
         at 
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
         at 
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
         at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
         at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
         at 
com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
         at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
         at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
         ... 27 more


Re: flink-s3-fs-hadoop retry configuration

Posted by Jeff Henrikson <je...@gmail.com>.
Robert,

Thanks for the tip!

Before you replied, I did figure out to put the keys in flink-conf.yaml, 
using btrace.  I instrumented the methods 
org.apache.hadoop.conf.Configuration.get for the keys, and 
org.apache.hadoop.conf.Configuration.substituteVars for effective 
values.  (There is a btrace bug where you can't just observe the return 
value from .get directly.)

I did not see in the code any way to observe the effective configuration 
using logging.

Regards,


Jeff



On 5/8/20 7:29 AM, Robert Metzger wrote:
> I validated my assumption. Putting
> 
> s3.connection.maximum: 123456
> 
> into the flink-conf.yaml file results in the following DEBUG log output:
> 
> 2020-05-08 16:20:47,461 DEBUG 
> org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding 
> Flink config entry for s3.connection.maximum as 
> fs.s3a.connection.maximum to Hadoop config
> 
> I guess that is the recommended way of passing configuration into the S3 
> connectors of Flink.
> 
> You also asked how to detect retries: DEBUG-log level is helpful again. 
> I just tried connecting against an invalid port, and got these messages:
> 
> 2020-05-08 16:26:37,671 DEBUG 
> org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - 
> http-outgoing-7: Shutdown connection
> 2020-05-08 16:26:37,671 DEBUG 
> org.apache.http.impl.execchain.MainClientExec                [] - 
> Connection discarded
> 2020-05-08 16:26:37,671 DEBUG 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - 
> Connection released: [id: 7][route: {}->http://127.0.0.1:9000][total 
> kept alive: 0; route allocated: 0 of 123456; total allocated: 0 of 123456]
> 2020-05-08 16:26:37,671 DEBUG com.amazonaws.request                     
>                     [] - Retrying Request: HEAD http://127.0.0.1:9000 
> /test/ Headers: (User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 
> Mac_OS_X/10.15.3 OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 
> scala/2.11.12, amz-sdk-invocation-id: 
> 051f9877-1c22-00ed-ad26-8361bcf14b98, Content-Type: 
> application/octet-stream, )
> 2020-05-08 16:26:37,671 DEBUG com.amazonaws.http.AmazonHttpClient       
>                     [] - Retriable error detected, will retry in 4226ms, 
> attempt number: 7
> 
> 
> maybe it makes sense to set the log level only for 
> "com.amazonaws.http.AmazonHttpClient" to DEBUG.
> 
> How to configure the log level depends on the deployment method. 
> Usually, its done by replacing the first INFO with DEBUG in 
> conf/log4j.properties. ("rootLogger.level = DEBUG")
> 
> 
> Best,
> Robert
> 
> On Fri, May 8, 2020 at 3:51 PM Robert Metzger <rmetzger@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hey Jeff,
> 
>     Which Flink version are you using?
>     Have you tried configuring the S3 filesystem via Flink's  config
>     yaml? Afaik all config parameters prefixed with "s3." are mirrored
>     into the Hadoop file system connector.
> 
> 
>     On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson <jehenrik27@gmail.com
>     <ma...@gmail.com>> wrote:
> 
>           > 2) How can I tell if flink-s3-fs-hadoop is actually managing
>         to pick up
>           > the hadoop configuration I have provided, as opposed to some
>         separate
>           > default configuration?
> 
>         I'm reading the docs and source of flink-fs-hadoop-shaded.  I
>         see that
>         core-default-shaded.xml has fs.s3a.connection.maximum set to
>         15.  I have
>         around 20 different DataStreams being instantiated from S3, so
>         if they
>         each require one connection to be healthy, then 15 is definitely
>         not a
>         good value.
> 
>         However, I seem to be unable to override
>         fs.s3a.connection.maximum using
>         my core-site.xml.  I am also unable to see the DEBUG level
>         messages for
>         the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.
> 
>         So now I'm wondering:
> 
>               1) Anybody know how to see DEBUG output for
>         flink-fs-hadoop-shaded?
> 
>               2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
>               override the config?
> 
> 
>         Thanks in advance,
> 
> 
>         Jeff Henrikson
> 
> 
> 
>         https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded
> 
>         https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml
> 
>             <property>
>               <name>fs.s3a.connection.maximum</name>
>               <value>15</value>
>               <description>Controls the maximum number of simultaneous
>         connections to S3.</description>
>             </property>
> 
> 
> 
> 
>         On 5/1/20 7:30 PM, Jeff Henrikson wrote:
>          > Hello Flink users,
>          >
>          > I could use help with three related questions:
>          >
>          > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
>          >
>          > 2) How can I tell if flink-s3-fs-hadoop is actually managing
>         to pick up
>          > the hadoop configuration I have provided, as opposed to some
>         separate
>          > default configuration?  My job fails quickly when I read
>         larger or more
>          > numerous objects from S3.  I conjecture the failure may be
>         related to
>          > insufficient retries when S3 throttles.
>          >
>          > 3) What s3 fault recovery approach would you recommend?
>          >
>          > Background:
>          >
>          > I am having trouble with reliable operation of the
>         flink-s3-fs-hadoop
>          > connector.   My application sources all its DataStream data
>         from S3, and
>          > appears to get frequently throttled by s3:
>          >
>          >      Caused by:
>          >     
>         org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>          >      Caught exception when processing split: [0]
>          >      s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
>          >      1586911084000 : 0 + 33554432
>          >      . . .
>          >      Caused by: java.io
>         <http://java.io>.InterruptedIOException: Failed to open
>          >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at
>         0 on
>          >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
>          >      com.amazonaws.SdkClientException: Unable to execute HTTP
>         request:
>          >      Timeout waiting for connection from pool
>          >
>          > The s3 throttling does not seem to trigger retries and so
>          > causes the job to fail.  For troubleshooting purposes, the
>         job stays up
>          > for much longer if I reduce s3 inputs to my job by disabling
>         functionality.
>          >
>          > I see in the documentation for hadoop-aws that there are
>         properties
>          > such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling
>         retries
>          > within hadoop.
>          >
>          > After wrangling with some classpath troubles, I managed to get
>          > flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of
>         hadoop
>          > configuration files {core/hdfs/mapred/yarn}-site.xml.  I can
>         confirm
>          > that the cluster parses the configuration by passing invalid
>         xml and
>          > seeing the cluster crash.
>          >
>          > The puzzle with which I am now faced is that the
>         configuration for
>          > retries and timeouts in core-site.xml seems to have no effect
>         on the
>          > application.
>          >
>          > I deploy in kubernetes with a custom docker image.  For now,
>         I have
>          > not enabled the zookeeper-based HA.
>          >
>          > See below for a frequent stacktrace that I interpret as
>         likely to be
>          > caused by s3 throttling.
>          >
>          > Thanks in advance for any help.
>          >
>          > Regards,
>          >
>          >
>          > Jeff Henrikson
>          >
>          >
>          >
>          >      2020-04-30 19:35:24
>          >      org.apache.flink.runtime.JobException: Recovery is
>         suppressed by
>          > NoRestartBackoffTimeStrategy
>          >          at
>          >
>         org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> 
>          >
>          >          at
>          > jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown
>         Source)
>          >          at
>          >
>         java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>          >
>          >          at
>         java.base/java.lang.reflect.Method.invoke(Method.java:566)
>          >          at
>          >
>         org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> 
>          >
>          >          at akka.japi.pf
>         <http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26)
>          >          at akka.japi.pf
>         <http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21)
>          >          at
>         scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>          >          at
>         scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>          >          at
>          > akka.japi.pf
>         <http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>          >          at
>          >
>         scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>          >          at
>          >
>         scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>          >          at
>          >
>         scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>          >          at akka.actor.Actor.aroundReceive(Actor.scala:517)
>          >          at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>          >          at
>         akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>          >          at
>         akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>          >          at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>          >          at
>         akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>          >          at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>          >          at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>          >          at
>          > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>          >          at
>          >
>         akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 
>          >
>          >          at
>          >
>         akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>          >          at
>          >
>         akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
>          >
>          >      Caused by:
>          >
>         org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>         Caught
>          > exception when processing split: [0]
>          > s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
>         1586911084000
>          > : 0 + 33554432
>          >          at
>          >
>         org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
> 
>          >
>          >          at
>          >
>         org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
> 
>          >
>          >          at
>          >
>         org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
> 
>          >
>          >      Caused by: java.io
>         <http://java.io>.InterruptedIOException: Failed to open
>          > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
>          > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
>          > com.amazonaws.SdkClientException: Unable to execute HTTP
>         request:
>          > Timeout waiting for connection from pool
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
> 
>          >
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:181)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:327)
> 
>          >
>          >          at
>          > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>          >          at
>          > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:320)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:428)
>          >          at
>          > java.base/java.io
>         <http://java.io>.DataInputStream.read(DataInputStream.java:149)
>          >          at
>          >
>         org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
> 
>          >
>          >          at
>          >
>         org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:695)
> 
>          >
>          >          at
>          >
>         org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:483)
> 
>          >
>          >          at
>          >
>         org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315)
> 
>          >
>          >      Caused by: com.amazonaws.SdkClientException: Unable to
>         execute HTTP
>          > request: Timeout waiting for connection from pool
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>          >          at
>          >
>         com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>          >          at
>          >
>         com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>          >          at
>          >
>         com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
> 
>          >
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:182)
> 
>          >
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>          >          ... 16 more
>          >      Caused by:
>         org.apache.http.conn.ConnectionPoolTimeoutException:
>          > Timeout waiting for connection from pool
>          >          at
>          >
>         org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
> 
>          >
>          >          at
>          >
>         org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
> 
>          >
>          >          at
>          > jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown
>         Source)
>          >          at
>          >
>         java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>          >
>          >          at
>         java.base/java.lang.reflect.Method.invoke(Method.java:566)
>          >          at
>          >
>         com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
> 
>          >
>          >          at com.amazonaws.http.conn.$Proxy21.get(Unknown Source)
>          >          at
>          >
>         org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
> 
>          >
>          >          at
>          >
>         org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
>          >          at
>          >
>         org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> 
>          >
>          >          at
>          >
>         org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> 
>          >
>          >          at
>          >
>         org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
> 
>          >
>          >          ... 27 more
>          >
> 

Re: flink-s3-fs-hadoop retry configuration

Posted by Robert Metzger <rm...@apache.org>.
I validated my assumption. Putting

s3.connection.maximum: 123456

into the flink-conf.yaml file results in the following DEBUG log output:

2020-05-08 16:20:47,461 DEBUG
org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding
Flink config entry for s3.connection.maximum as fs.s3a.connection.maximum
to Hadoop config

I guess that is the recommended way of passing configuration into the S3
connectors of Flink.

You also asked how to detect retries: DEBUG-log level is helpful again. I
just tried connecting against an invalid port, and got these messages:

2020-05-08 16:26:37,671 DEBUG
org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] -
http-outgoing-7: Shutdown connection
2020-05-08 16:26:37,671 DEBUG org.apache.http.impl.execchain.MainClientExec
               [] - Connection discarded
2020-05-08 16:26:37,671 DEBUG
org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] -
Connection released: [id: 7][route: {}->http://127.0.0.1:9000][total kept
alive: 0; route allocated: 0 of 123456; total allocated: 0 of 123456]
2020-05-08 16:26:37,671 DEBUG com.amazonaws.request
               [] - Retrying Request: HEAD http://127.0.0.1:9000 /test/
Headers: (User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 Mac_OS_X/10.15.3
OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 scala/2.11.12,
amz-sdk-invocation-id: 051f9877-1c22-00ed-ad26-8361bcf14b98, Content-Type:
application/octet-stream, )
2020-05-08 16:26:37,671 DEBUG com.amazonaws.http.AmazonHttpClient
               [] - Retriable error detected, will retry in 4226ms, attempt
number: 7


maybe it makes sense to set the log level only for
"com.amazonaws.http.AmazonHttpClient" to DEBUG.

How to configure the log level depends on the deployment method. Usually,
its done by replacing the first INFO with DEBUG in conf/log4j.properties.
("rootLogger.level = DEBUG")


Best,
Robert

On Fri, May 8, 2020 at 3:51 PM Robert Metzger <rm...@apache.org> wrote:

> Hey Jeff,
>
> Which Flink version are you using?
> Have you tried configuring the S3 filesystem via Flink's  config yaml?
> Afaik all config parameters prefixed with "s3." are mirrored into the
> Hadoop file system connector.
>
>
> On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson <je...@gmail.com>
> wrote:
>
>>  > 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
>>  > the hadoop configuration I have provided, as opposed to some separate
>>  > default configuration?
>>
>> I'm reading the docs and source of flink-fs-hadoop-shaded.  I see that
>> core-default-shaded.xml has fs.s3a.connection.maximum set to 15.  I have
>> around 20 different DataStreams being instantiated from S3, so if they
>> each require one connection to be healthy, then 15 is definitely not a
>> good value.
>>
>> However, I seem to be unable to override fs.s3a.connection.maximum using
>> my core-site.xml.  I am also unable to see the DEBUG level messages for
>> the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.
>>
>> So now I'm wondering:
>>
>>      1) Anybody know how to see DEBUG output for flink-fs-hadoop-shaded?
>>
>>      2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
>>      override the config?
>>
>>
>> Thanks in advance,
>>
>>
>> Jeff Henrikson
>>
>>
>>
>>
>> https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded
>>
>>
>> https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml
>>
>>    <property>
>>      <name>fs.s3a.connection.maximum</name>
>>      <value>15</value>
>>      <description>Controls the maximum number of simultaneous
>> connections to S3.</description>
>>    </property>
>>
>>
>>
>>
>> On 5/1/20 7:30 PM, Jeff Henrikson wrote:
>> > Hello Flink users,
>> >
>> > I could use help with three related questions:
>> >
>> > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
>> >
>> > 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
>> > the hadoop configuration I have provided, as opposed to some separate
>> > default configuration?  My job fails quickly when I read larger or more
>> > numerous objects from S3.  I conjecture the failure may be related to
>> > insufficient retries when S3 throttles.
>> >
>> > 3) What s3 fault recovery approach would you recommend?
>> >
>> > Background:
>> >
>> > I am having trouble with reliable operation of the flink-s3-fs-hadoop
>> > connector.   My application sources all its DataStream data from S3,
>> and
>> > appears to get frequently throttled by s3:
>> >
>> >      Caused by:
>> >      org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>> >      Caught exception when processing split: [0]
>> >      s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
>> >      1586911084000 : 0 + 33554432
>> >      . . .
>> >      Caused by: java.io.InterruptedIOException: Failed to open
>> >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
>> >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
>> >      com.amazonaws.SdkClientException: Unable to execute HTTP request:
>> >      Timeout waiting for connection from pool
>> >
>> > The s3 throttling does not seem to trigger retries and so
>> > causes the job to fail.  For troubleshooting purposes, the job stays up
>> > for much longer if I reduce s3 inputs to my job by disabling
>> functionality.
>> >
>> > I see in the documentation for hadoop-aws that there are properties
>> > such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling retries
>> > within hadoop.
>> >
>> > After wrangling with some classpath troubles, I managed to get
>> > flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of hadoop
>> > configuration files {core/hdfs/mapred/yarn}-site.xml.  I can confirm
>> > that the cluster parses the configuration by passing invalid xml and
>> > seeing the cluster crash.
>> >
>> > The puzzle with which I am now faced is that the configuration for
>> > retries and timeouts in core-site.xml seems to have no effect on the
>> > application.
>> >
>> > I deploy in kubernetes with a custom docker image.  For now, I have
>> > not enabled the zookeeper-based HA.
>> >
>> > See below for a frequent stacktrace that I interpret as likely to be
>> > caused by s3 throttling.
>> >
>> > Thanks in advance for any help.
>> >
>> > Regards,
>> >
>> >
>> > Jeff Henrikson
>> >
>> >
>> >
>> >      2020-04-30 19:35:24
>> >      org.apache.flink.runtime.JobException: Recovery is suppressed by
>> > NoRestartBackoffTimeStrategy
>> >          at
>> >
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>
>> >
>> >          at
>> >
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>
>> >
>> >          at
>> >
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>
>> >
>> >          at
>> >
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>
>> >
>> >          at
>> >
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>
>> >
>> >          at
>> >
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>>
>> >
>> >          at
>> >
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>
>> >
>> >          at
>> > jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
>> >          at
>> >
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> >
>> >          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> >          at
>> >
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>>
>> >
>> >          at
>> >
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>>
>> >
>> >          at
>> >
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>
>> >
>> >          at
>> >
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>
>> >
>> >          at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:26)
>> >          at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:21)
>> >          at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> >          at
>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> >          at
>> > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> >          at
>> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> >          at
>> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> >          at
>> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> >          at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> >          at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> >          at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> >          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> >          at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> >          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> >          at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> >          at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> >          at
>> > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >          at
>> >
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> >
>> >          at
>> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >          at
>> >
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> >
>> >      Caused by:
>> > org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> > exception when processing split: [0]
>> > s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
>> 1586911084000
>> > : 0 + 33554432
>> >          at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
>>
>> >
>> >          at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
>>
>> >
>> >          at
>> >
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>>
>> >
>> >      Caused by: java.io.InterruptedIOException: Failed to open
>> > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
>> > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
>> > com.amazonaws.SdkClientException: Unable to execute HTTP request:
>> > Timeout waiting for connection from pool
>> >          at
>> >
>> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
>>
>> >
>> >          at
>> > org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
>> >          at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
>> >          at
>> > org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:181)
>> >          at
>> >
>> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:327)
>>
>> >
>> >          at
>> > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
>> >          at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>> >          at
>> > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>> >          at
>> > org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>> >          at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>> >          at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
>> >          at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
>> >          at
>> >
>> org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:320)
>> >          at
>> > org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:428)
>> >          at
>> > java.base/java.io.DataInputStream.read(DataInputStream.java:149)
>> >          at
>> >
>> org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
>>
>> >
>> >          at
>> >
>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:695)
>>
>> >
>> >          at
>> >
>> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:483)
>>
>> >
>> >          at
>> >
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315)
>>
>> >
>> >      Caused by: com.amazonaws.SdkClientException: Unable to execute
>> HTTP
>> > request: Timeout waiting for connection from pool
>> >          at
>> >
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
>>
>> >
>> >          at
>> >
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
>>
>> >
>> >          at
>> >
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>
>> >
>> >          at
>> >
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>
>> >
>> >          at
>> >
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>
>> >
>> >          at
>> >
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>
>> >
>> >          at
>> >
>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>
>> >
>> >          at
>> > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>> >          at
>> >
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>> >          at
>> >
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>> >          at
>> >
>> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
>>
>> >
>> >          at
>> >
>> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:182)
>>
>> >
>> >          at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>> >          ... 16 more
>> >      Caused by: org.apache.http.conn.ConnectionPoolTimeoutException:
>> > Timeout waiting for connection from pool
>> >          at
>> >
>> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
>>
>> >
>> >          at
>> >
>> org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
>>
>> >
>> >          at
>> > jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
>> >          at
>> >
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> >
>> >          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> >          at
>> >
>> com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>>
>> >
>> >          at com.amazonaws.http.conn.$Proxy21.get(Unknown Source)
>> >          at
>> >
>> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
>>
>> >
>> >          at
>> >
>> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
>> >          at
>> >
>> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>
>> >
>> >          at
>> >
>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>
>> >
>> >          at
>> >
>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>
>> >
>> >          at
>> >
>> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>
>> >
>> >          at
>> >
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
>>
>> >
>> >          at
>> >
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
>>
>> >
>> >          ... 27 more
>> >
>>
>

Re: flink-s3-fs-hadoop retry configuration

Posted by Robert Metzger <rm...@apache.org>.
Hey Jeff,

Which Flink version are you using?
Have you tried configuring the S3 filesystem via Flink's  config yaml?
Afaik all config parameters prefixed with "s3." are mirrored into the
Hadoop file system connector.


On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson <je...@gmail.com> wrote:

>  > 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
>  > the hadoop configuration I have provided, as opposed to some separate
>  > default configuration?
>
> I'm reading the docs and source of flink-fs-hadoop-shaded.  I see that
> core-default-shaded.xml has fs.s3a.connection.maximum set to 15.  I have
> around 20 different DataStreams being instantiated from S3, so if they
> each require one connection to be healthy, then 15 is definitely not a
> good value.
>
> However, I seem to be unable to override fs.s3a.connection.maximum using
> my core-site.xml.  I am also unable to see the DEBUG level messages for
> the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.
>
> So now I'm wondering:
>
>      1) Anybody know how to see DEBUG output for flink-fs-hadoop-shaded?
>
>      2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
>      override the config?
>
>
> Thanks in advance,
>
>
> Jeff Henrikson
>
>
>
>
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded
>
>
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml
>
>    <property>
>      <name>fs.s3a.connection.maximum</name>
>      <value>15</value>
>      <description>Controls the maximum number of simultaneous
> connections to S3.</description>
>    </property>
>
>
>
>
> On 5/1/20 7:30 PM, Jeff Henrikson wrote:
> > Hello Flink users,
> >
> > I could use help with three related questions:
> >
> > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
> >
> > 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
> > the hadoop configuration I have provided, as opposed to some separate
> > default configuration?  My job fails quickly when I read larger or more
> > numerous objects from S3.  I conjecture the failure may be related to
> > insufficient retries when S3 throttles.
> >
> > 3) What s3 fault recovery approach would you recommend?
> >
> > Background:
> >
> > I am having trouble with reliable operation of the flink-s3-fs-hadoop
> > connector.   My application sources all its DataStream data from S3, and
> > appears to get frequently throttled by s3:
> >
> >      Caused by:
> >      org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> >      Caught exception when processing split: [0]
> >      s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
> >      1586911084000 : 0 + 33554432
> >      . . .
> >      Caused by: java.io.InterruptedIOException: Failed to open
> >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
> >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
> >      com.amazonaws.SdkClientException: Unable to execute HTTP request:
> >      Timeout waiting for connection from pool
> >
> > The s3 throttling does not seem to trigger retries and so
> > causes the job to fail.  For troubleshooting purposes, the job stays up
> > for much longer if I reduce s3 inputs to my job by disabling
> functionality.
> >
> > I see in the documentation for hadoop-aws that there are properties
> > such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling retries
> > within hadoop.
> >
> > After wrangling with some classpath troubles, I managed to get
> > flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of hadoop
> > configuration files {core/hdfs/mapred/yarn}-site.xml.  I can confirm
> > that the cluster parses the configuration by passing invalid xml and
> > seeing the cluster crash.
> >
> > The puzzle with which I am now faced is that the configuration for
> > retries and timeouts in core-site.xml seems to have no effect on the
> > application.
> >
> > I deploy in kubernetes with a custom docker image.  For now, I have
> > not enabled the zookeeper-based HA.
> >
> > See below for a frequent stacktrace that I interpret as likely to be
> > caused by s3 throttling.
> >
> > Thanks in advance for any help.
> >
> > Regards,
> >
> >
> > Jeff Henrikson
> >
> >
> >
> >      2020-04-30 19:35:24
> >      org.apache.flink.runtime.JobException: Recovery is suppressed by
> > NoRestartBackoffTimeStrategy
> >          at
> >
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>
> >
> >          at
> >
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>
> >
> >          at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>
> >
> >          at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>
> >
> >          at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>
> >
> >          at
> >
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>
> >
> >          at
> >
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>
> >
> >          at
> > jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
> >          at
> >
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> >
> >          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> >          at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>
> >
> >          at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>
> >
> >          at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>
> >
> >          at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>
> >
> >          at akka.japi.pf
> .UnitCaseStatement.apply(CaseStatements.scala:26)
> >          at akka.japi.pf
> .UnitCaseStatement.apply(CaseStatements.scala:21)
> >          at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> >          at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> >          at
> > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> >          at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >          at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >          at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >          at akka.actor.Actor.aroundReceive(Actor.scala:517)
> >          at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> >          at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> >          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> >          at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> >          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> >          at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> >          at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> >          at
> > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >          at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> >
> >          at
> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >          at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> >
> >      Caused by:
> > org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> > exception when processing split: [0]
> > s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@ 1586911084000
> > : 0 + 33554432
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
>
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
>
> >
> >          at
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>
> >
> >      Caused by: java.io.InterruptedIOException: Failed to open
> > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
> > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
> > com.amazonaws.SdkClientException: Unable to execute HTTP request:
> > Timeout waiting for connection from pool
> >          at
> >
> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
>
> >
> >          at
> > org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
> >          at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
> >          at
> > org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:181)
> >          at
> >
> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:327)
>
> >
> >          at
> > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
> >          at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> >          at
> > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
> >          at
> > org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
> >          at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
> >          at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
> >          at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
> >          at
> > org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:320)
> >          at
> > org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:428)
> >          at
> > java.base/java.io.DataInputStream.read(DataInputStream.java:149)
> >          at
> >
> org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
>
> >
> >          at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:695)
>
> >
> >          at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:483)
>
> >
> >          at
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315)
>
> >
> >      Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP
> > request: Timeout waiting for connection from pool
> >          at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
>
> >
> >          at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
>
> >
> >          at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>
> >
> >          at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>
> >
> >          at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>
> >
> >          at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>
> >
> >          at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>
> >
> >          at
> > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> >          at
> > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> >          at
> > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> >          at
> >
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
>
> >
> >          at
> >
> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:182)
>
> >
> >          at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> >          ... 16 more
> >      Caused by: org.apache.http.conn.ConnectionPoolTimeoutException:
> > Timeout waiting for connection from pool
> >          at
> >
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
>
> >
> >          at
> >
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
>
> >
> >          at
> > jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
> >          at
> >
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> >
> >          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> >          at
> >
> com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>
> >
> >          at com.amazonaws.http.conn.$Proxy21.get(Unknown Source)
> >          at
> >
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
>
> >
> >          at
> >
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
> >          at
> >
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>
> >
> >          at
> >
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>
> >
> >          at
> >
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>
> >
> >          at
> >
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>
> >
> >          at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
>
> >
> >          at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
>
> >
> >          ... 27 more
> >
>

Re: flink-s3-fs-hadoop retry configuration

Posted by Jeff Henrikson <je...@gmail.com>.
 > 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
 > the hadoop configuration I have provided, as opposed to some separate
 > default configuration?

I'm reading the docs and source of flink-fs-hadoop-shaded.  I see that 
core-default-shaded.xml has fs.s3a.connection.maximum set to 15.  I have 
around 20 different DataStreams being instantiated from S3, so if they 
each require one connection to be healthy, then 15 is definitely not a 
good value.

However, I seem to be unable to override fs.s3a.connection.maximum using 
my core-site.xml.  I am also unable to see the DEBUG level messages for 
the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.

So now I'm wondering:

     1) Anybody know how to see DEBUG output for flink-fs-hadoop-shaded?

     2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
     override the config?


Thanks in advance,


Jeff Henrikson



https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded

https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml

   <property>
     <name>fs.s3a.connection.maximum</name>
     <value>15</value>
     <description>Controls the maximum number of simultaneous 
connections to S3.</description>
   </property>




On 5/1/20 7:30 PM, Jeff Henrikson wrote:
> Hello Flink users,
> 
> I could use help with three related questions:
> 
> 1) How can I observe retries in the flink-s3-fs-hadoop connector?
> 
> 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
> the hadoop configuration I have provided, as opposed to some separate
> default configuration?  My job fails quickly when I read larger or more 
> numerous objects from S3.  I conjecture the failure may be related to 
> insufficient retries when S3 throttles.
> 
> 3) What s3 fault recovery approach would you recommend?
> 
> Background:
> 
> I am having trouble with reliable operation of the flink-s3-fs-hadoop 
> connector.   My application sources all its DataStream data from S3, and 
> appears to get frequently throttled by s3:
> 
>      Caused by:
>      org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>      Caught exception when processing split: [0]
>      s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
>      1586911084000 : 0 + 33554432
>      . . .
>      Caused by: java.io.InterruptedIOException: Failed to open
>      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
>      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
>      com.amazonaws.SdkClientException: Unable to execute HTTP request:
>      Timeout waiting for connection from pool
> 
> The s3 throttling does not seem to trigger retries and so
> causes the job to fail.  For troubleshooting purposes, the job stays up
> for much longer if I reduce s3 inputs to my job by disabling functionality.
> 
> I see in the documentation for hadoop-aws that there are properties
> such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling retries
> within hadoop.
> 
> After wrangling with some classpath troubles, I managed to get
> flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of hadoop
> configuration files {core/hdfs/mapred/yarn}-site.xml.  I can confirm
> that the cluster parses the configuration by passing invalid xml and
> seeing the cluster crash.
> 
> The puzzle with which I am now faced is that the configuration for 
> retries and timeouts in core-site.xml seems to have no effect on the
> application.
> 
> I deploy in kubernetes with a custom docker image.  For now, I have
> not enabled the zookeeper-based HA.
> 
> See below for a frequent stacktrace that I interpret as likely to be
> caused by s3 throttling.
> 
> Thanks in advance for any help.
> 
> Regards,
> 
> 
> Jeff Henrikson
> 
> 
> 
>      2020-04-30 19:35:24
>      org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>          at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) 
> 
>          at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) 
> 
>          at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) 
> 
>          at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) 
> 
>          at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) 
> 
>          at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) 
> 
>          at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) 
> 
>          at 
> jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
>          at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
> 
>          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>          at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) 
> 
>          at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) 
> 
>          at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) 
> 
>          at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) 
> 
>          at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>          at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>          at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>          at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>          at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>          at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>          at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>          at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>          at akka.actor.Actor.aroundReceive(Actor.scala:517)
>          at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>          at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>          at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>          at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>          at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>          at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>          at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> 
>          at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>          at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
> 
>      Caused by: 
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught 
> exception when processing split: [0] 
> s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@ 1586911084000 
> : 0 + 33554432
>          at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090) 
> 
>          at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058) 
> 
>          at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351) 
> 
>      Caused by: java.io.InterruptedIOException: Failed to open 
> s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on 
> s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv: 
> com.amazonaws.SdkClientException: Unable to execute HTTP request: 
> Timeout waiting for connection from pool
>          at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340) 
> 
>          at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
>          at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
>          at 
> org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:181)
>          at 
> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:327) 
> 
>          at 
> org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
>          at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>          at 
> org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>          at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>          at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>          at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
>          at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
>          at 
> org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:320)
>          at 
> org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:428)
>          at 
> java.base/java.io.DataInputStream.read(DataInputStream.java:149)
>          at 
> org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:94) 
> 
>          at 
> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:695) 
> 
>          at 
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:483) 
> 
>          at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315) 
> 
>      Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP 
> request: Timeout waiting for connection from pool
>          at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114) 
> 
>          at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064) 
> 
>          at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) 
> 
>          at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) 
> 
>          at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) 
> 
>          at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) 
> 
>          at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) 
> 
>          at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>          at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>          at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>          at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409) 
> 
>          at 
> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:182) 
> 
>          at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>          ... 16 more
>      Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: 
> Timeout waiting for connection from pool
>          at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292) 
> 
>          at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269) 
> 
>          at 
> jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
>          at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
> 
>          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>          at 
> com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) 
> 
>          at com.amazonaws.http.conn.$Proxy21.get(Unknown Source)
>          at 
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191) 
> 
>          at 
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
>          at 
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) 
> 
>          at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) 
> 
>          at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) 
> 
>          at 
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) 
> 
>          at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236) 
> 
>          at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) 
> 
>          ... 27 more
>