You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jeff Henrikson <je...@gmail.com> on 2020/06/17 17:27:29 UTC

Re: flink-s3-fs-hadoop retry configuration

Robert,

Thanks for the tip!

Before you replied, I did figure out to put the keys in flink-conf.yaml, 
using btrace.  I instrumented the methods 
org.apache.hadoop.conf.Configuration.get for the keys, and 
org.apache.hadoop.conf.Configuration.substituteVars for effective 
values.  (There is a btrace bug where you can't just observe the return 
value from .get directly.)

I did not see in the code any way to observe the effective configuration 
using logging.

Regards,


Jeff



On 5/8/20 7:29 AM, Robert Metzger wrote:
> I validated my assumption. Putting
> 
> s3.connection.maximum: 123456
> 
> into the flink-conf.yaml file results in the following DEBUG log output:
> 
> 2020-05-08 16:20:47,461 DEBUG 
> org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding 
> Flink config entry for s3.connection.maximum as 
> fs.s3a.connection.maximum to Hadoop config
> 
> I guess that is the recommended way of passing configuration into the S3 
> connectors of Flink.
> 
> You also asked how to detect retries: DEBUG-log level is helpful again. 
> I just tried connecting against an invalid port, and got these messages:
> 
> 2020-05-08 16:26:37,671 DEBUG 
> org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - 
> http-outgoing-7: Shutdown connection
> 2020-05-08 16:26:37,671 DEBUG 
> org.apache.http.impl.execchain.MainClientExec                [] - 
> Connection discarded
> 2020-05-08 16:26:37,671 DEBUG 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - 
> Connection released: [id: 7][route: {}->http://127.0.0.1:9000][total 
> kept alive: 0; route allocated: 0 of 123456; total allocated: 0 of 123456]
> 2020-05-08 16:26:37,671 DEBUG com.amazonaws.request                     
>                     [] - Retrying Request: HEAD http://127.0.0.1:9000 
> /test/ Headers: (User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 
> Mac_OS_X/10.15.3 OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 
> scala/2.11.12, amz-sdk-invocation-id: 
> 051f9877-1c22-00ed-ad26-8361bcf14b98, Content-Type: 
> application/octet-stream, )
> 2020-05-08 16:26:37,671 DEBUG com.amazonaws.http.AmazonHttpClient       
>                     [] - Retriable error detected, will retry in 4226ms, 
> attempt number: 7
> 
> 
> maybe it makes sense to set the log level only for 
> "com.amazonaws.http.AmazonHttpClient" to DEBUG.
> 
> How to configure the log level depends on the deployment method. 
> Usually, its done by replacing the first INFO with DEBUG in 
> conf/log4j.properties. ("rootLogger.level = DEBUG")
> 
> 
> Best,
> Robert
> 
> On Fri, May 8, 2020 at 3:51 PM Robert Metzger <rmetzger@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hey Jeff,
> 
>     Which Flink version are you using?
>     Have you tried configuring the S3 filesystem via Flink's  config
>     yaml? Afaik all config parameters prefixed with "s3." are mirrored
>     into the Hadoop file system connector.
> 
> 
>     On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson <jehenrik27@gmail.com
>     <ma...@gmail.com>> wrote:
> 
>           > 2) How can I tell if flink-s3-fs-hadoop is actually managing
>         to pick up
>           > the hadoop configuration I have provided, as opposed to some
>         separate
>           > default configuration?
> 
>         I'm reading the docs and source of flink-fs-hadoop-shaded.  I
>         see that
>         core-default-shaded.xml has fs.s3a.connection.maximum set to
>         15.  I have
>         around 20 different DataStreams being instantiated from S3, so
>         if they
>         each require one connection to be healthy, then 15 is definitely
>         not a
>         good value.
> 
>         However, I seem to be unable to override
>         fs.s3a.connection.maximum using
>         my core-site.xml.  I am also unable to see the DEBUG level
>         messages for
>         the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.
> 
>         So now I'm wondering:
> 
>               1) Anybody know how to see DEBUG output for
>         flink-fs-hadoop-shaded?
> 
>               2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
>               override the config?
> 
> 
>         Thanks in advance,
> 
> 
>         Jeff Henrikson
> 
> 
> 
>         https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded
> 
>         https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml
> 
>             <property>
>               <name>fs.s3a.connection.maximum</name>
>               <value>15</value>
>               <description>Controls the maximum number of simultaneous
>         connections to S3.</description>
>             </property>
> 
> 
> 
> 
>         On 5/1/20 7:30 PM, Jeff Henrikson wrote:
>          > Hello Flink users,
>          >
>          > I could use help with three related questions:
>          >
>          > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
>          >
>          > 2) How can I tell if flink-s3-fs-hadoop is actually managing
>         to pick up
>          > the hadoop configuration I have provided, as opposed to some
>         separate
>          > default configuration?  My job fails quickly when I read
>         larger or more
>          > numerous objects from S3.  I conjecture the failure may be
>         related to
>          > insufficient retries when S3 throttles.
>          >
>          > 3) What s3 fault recovery approach would you recommend?
>          >
>          > Background:
>          >
>          > I am having trouble with reliable operation of the
>         flink-s3-fs-hadoop
>          > connector.   My application sources all its DataStream data
>         from S3, and
>          > appears to get frequently throttled by s3:
>          >
>          >      Caused by:
>          >     
>         org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>          >      Caught exception when processing split: [0]
>          >      s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
>          >      1586911084000 : 0 + 33554432
>          >      . . .
>          >      Caused by: java.io
>         <http://java.io>.InterruptedIOException: Failed to open
>          >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at
>         0 on
>          >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
>          >      com.amazonaws.SdkClientException: Unable to execute HTTP
>         request:
>          >      Timeout waiting for connection from pool
>          >
>          > The s3 throttling does not seem to trigger retries and so
>          > causes the job to fail.  For troubleshooting purposes, the
>         job stays up
>          > for much longer if I reduce s3 inputs to my job by disabling
>         functionality.
>          >
>          > I see in the documentation for hadoop-aws that there are
>         properties
>          > such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling
>         retries
>          > within hadoop.
>          >
>          > After wrangling with some classpath troubles, I managed to get
>          > flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of
>         hadoop
>          > configuration files {core/hdfs/mapred/yarn}-site.xml.  I can
>         confirm
>          > that the cluster parses the configuration by passing invalid
>         xml and
>          > seeing the cluster crash.
>          >
>          > The puzzle with which I am now faced is that the
>         configuration for
>          > retries and timeouts in core-site.xml seems to have no effect
>         on the
>          > application.
>          >
>          > I deploy in kubernetes with a custom docker image.  For now,
>         I have
>          > not enabled the zookeeper-based HA.
>          >
>          > See below for a frequent stacktrace that I interpret as
>         likely to be
>          > caused by s3 throttling.
>          >
>          > Thanks in advance for any help.
>          >
>          > Regards,
>          >
>          >
>          > Jeff Henrikson
>          >
>          >
>          >
>          >      2020-04-30 19:35:24
>          >      org.apache.flink.runtime.JobException: Recovery is
>         suppressed by
>          > NoRestartBackoffTimeStrategy
>          >          at
>          >
>         org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> 
>          >
>          >          at
>          > jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown
>         Source)
>          >          at
>          >
>         java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>          >
>          >          at
>         java.base/java.lang.reflect.Method.invoke(Method.java:566)
>          >          at
>          >
>         org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> 
>          >
>          >          at
>          >
>         org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> 
>          >
>          >          at akka.japi.pf
>         <http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26)
>          >          at akka.japi.pf
>         <http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21)
>          >          at
>         scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>          >          at
>         scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>          >          at
>          > akka.japi.pf
>         <http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>          >          at
>          >
>         scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>          >          at
>          >
>         scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>          >          at
>          >
>         scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>          >          at akka.actor.Actor.aroundReceive(Actor.scala:517)
>          >          at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>          >          at
>         akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>          >          at
>         akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>          >          at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>          >          at
>         akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>          >          at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>          >          at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>          >          at
>          > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>          >          at
>          >
>         akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 
>          >
>          >          at
>          >
>         akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>          >          at
>          >
>         akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
>          >
>          >      Caused by:
>          >
>         org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>         Caught
>          > exception when processing split: [0]
>          > s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
>         1586911084000
>          > : 0 + 33554432
>          >          at
>          >
>         org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
> 
>          >
>          >          at
>          >
>         org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
> 
>          >
>          >          at
>          >
>         org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
> 
>          >
>          >      Caused by: java.io
>         <http://java.io>.InterruptedIOException: Failed to open
>          > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
>          > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
>          > com.amazonaws.SdkClientException: Unable to execute HTTP
>         request:
>          > Timeout waiting for connection from pool
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
> 
>          >
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:181)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:327)
> 
>          >
>          >          at
>          > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>          >          at
>          > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:320)
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:428)
>          >          at
>          > java.base/java.io
>         <http://java.io>.DataInputStream.read(DataInputStream.java:149)
>          >          at
>          >
>         org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
> 
>          >
>          >          at
>          >
>         org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:695)
> 
>          >
>          >          at
>          >
>         org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:483)
> 
>          >
>          >          at
>          >
>         org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315)
> 
>          >
>          >      Caused by: com.amazonaws.SdkClientException: Unable to
>         execute HTTP
>          > request: Timeout waiting for connection from pool
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>          >          at
>          >
>         com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>          >          at
>          >
>         com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>          >          at
>          >
>         com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
> 
>          >
>          >          at
>          >
>         org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:182)
> 
>          >
>          >          at
>         org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>          >          ... 16 more
>          >      Caused by:
>         org.apache.http.conn.ConnectionPoolTimeoutException:
>          > Timeout waiting for connection from pool
>          >          at
>          >
>         org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
> 
>          >
>          >          at
>          >
>         org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
> 
>          >
>          >          at
>          > jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown
>         Source)
>          >          at
>          >
>         java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>          >
>          >          at
>         java.base/java.lang.reflect.Method.invoke(Method.java:566)
>          >          at
>          >
>         com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
> 
>          >
>          >          at com.amazonaws.http.conn.$Proxy21.get(Unknown Source)
>          >          at
>          >
>         org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
> 
>          >
>          >          at
>          >
>         org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
>          >          at
>          >
>         org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> 
>          >
>          >          at
>          >
>         org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> 
>          >
>          >          at
>          >
>         org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
> 
>          >
>          >          at
>          >
>         com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
> 
>          >
>          >          ... 27 more
>          >
>