You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jeff Henrikson <je...@gmail.com> on 2020/06/17 17:27:29 UTC
Re: flink-s3-fs-hadoop retry configuration
Robert,
Thanks for the tip!
Before you replied, I did figure out to put the keys in flink-conf.yaml,
using btrace. I instrumented the methods
org.apache.hadoop.conf.Configuration.get for the keys, and
org.apache.hadoop.conf.Configuration.substituteVars for effective
values. (There is a btrace bug where you can't just observe the return
value from .get directly.)
I did not see in the code any way to observe the effective configuration
using logging.
Regards,
Jeff
On 5/8/20 7:29 AM, Robert Metzger wrote:
> I validated my assumption. Putting
>
> s3.connection.maximum: 123456
>
> into the flink-conf.yaml file results in the following DEBUG log output:
>
> 2020-05-08 16:20:47,461 DEBUG
> org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader [] - Adding
> Flink config entry for s3.connection.maximum as
> fs.s3a.connection.maximum to Hadoop config
>
> I guess that is the recommended way of passing configuration into the S3
> connectors of Flink.
>
> You also asked how to detect retries: DEBUG-log level is helpful again.
> I just tried connecting against an invalid port, and got these messages:
>
> 2020-05-08 16:26:37,671 DEBUG
> org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] -
> http-outgoing-7: Shutdown connection
> 2020-05-08 16:26:37,671 DEBUG
> org.apache.http.impl.execchain.MainClientExec [] -
> Connection discarded
> 2020-05-08 16:26:37,671 DEBUG
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] -
> Connection released: [id: 7][route: {}->http://127.0.0.1:9000][total
> kept alive: 0; route allocated: 0 of 123456; total allocated: 0 of 123456]
> 2020-05-08 16:26:37,671 DEBUG com.amazonaws.request
> [] - Retrying Request: HEAD http://127.0.0.1:9000
> /test/ Headers: (User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271
> Mac_OS_X/10.15.3 OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252
> scala/2.11.12, amz-sdk-invocation-id:
> 051f9877-1c22-00ed-ad26-8361bcf14b98, Content-Type:
> application/octet-stream, )
> 2020-05-08 16:26:37,671 DEBUG com.amazonaws.http.AmazonHttpClient
> [] - Retriable error detected, will retry in 4226ms,
> attempt number: 7
>
>
> maybe it makes sense to set the log level only for
> "com.amazonaws.http.AmazonHttpClient" to DEBUG.
>
> How to configure the log level depends on the deployment method.
> Usually, its done by replacing the first INFO with DEBUG in
> conf/log4j.properties. ("rootLogger.level = DEBUG")
>
>
> Best,
> Robert
>
> On Fri, May 8, 2020 at 3:51 PM Robert Metzger <rmetzger@apache.org
> <ma...@apache.org>> wrote:
>
> Hey Jeff,
>
> Which Flink version are you using?
> Have you tried configuring the S3 filesystem via Flink's config
> yaml? Afaik all config parameters prefixed with "s3." are mirrored
> into the Hadoop file system connector.
>
>
> On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson <jehenrik27@gmail.com
> <ma...@gmail.com>> wrote:
>
> > 2) How can I tell if flink-s3-fs-hadoop is actually managing
> to pick up
> > the hadoop configuration I have provided, as opposed to some
> separate
> > default configuration?
>
> I'm reading the docs and source of flink-fs-hadoop-shaded. I
> see that
> core-default-shaded.xml has fs.s3a.connection.maximum set to
> 15. I have
> around 20 different DataStreams being instantiated from S3, so
> if they
> each require one connection to be healthy, then 15 is definitely
> not a
> good value.
>
> However, I seem to be unable to override
> fs.s3a.connection.maximum using
> my core-site.xml. I am also unable to see the DEBUG level
> messages for
> the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.
>
> So now I'm wondering:
>
> 1) Anybody know how to see DEBUG output for
> flink-fs-hadoop-shaded?
>
> 2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
> override the config?
>
>
> Thanks in advance,
>
>
> Jeff Henrikson
>
>
>
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded
>
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml
>
> <property>
> <name>fs.s3a.connection.maximum</name>
> <value>15</value>
> <description>Controls the maximum number of simultaneous
> connections to S3.</description>
> </property>
>
>
>
>
> On 5/1/20 7:30 PM, Jeff Henrikson wrote:
> > Hello Flink users,
> >
> > I could use help with three related questions:
> >
> > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
> >
> > 2) How can I tell if flink-s3-fs-hadoop is actually managing
> to pick up
> > the hadoop configuration I have provided, as opposed to some
> separate
> > default configuration? My job fails quickly when I read
> larger or more
> > numerous objects from S3. I conjecture the failure may be
> related to
> > insufficient retries when S3 throttles.
> >
> > 3) What s3 fault recovery approach would you recommend?
> >
> > Background:
> >
> > I am having trouble with reliable operation of the
> flink-s3-fs-hadoop
> > connector. My application sources all its DataStream data
> from S3, and
> > appears to get frequently throttled by s3:
> >
> > Caused by:
> >
> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> > Caught exception when processing split: [0]
> > s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
> > 1586911084000 : 0 + 33554432
> > . . .
> > Caused by: java.io
> <http://java.io>.InterruptedIOException: Failed to open
> > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at
> 0 on
> > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
> > com.amazonaws.SdkClientException: Unable to execute HTTP
> request:
> > Timeout waiting for connection from pool
> >
> > The s3 throttling does not seem to trigger retries and so
> > causes the job to fail. For troubleshooting purposes, the
> job stays up
> > for much longer if I reduce s3 inputs to my job by disabling
> functionality.
> >
> > I see in the documentation for hadoop-aws that there are
> properties
> > such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling
> retries
> > within hadoop.
> >
> > After wrangling with some classpath troubles, I managed to get
> > flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of
> hadoop
> > configuration files {core/hdfs/mapred/yarn}-site.xml. I can
> confirm
> > that the cluster parses the configuration by passing invalid
> xml and
> > seeing the cluster crash.
> >
> > The puzzle with which I am now faced is that the
> configuration for
> > retries and timeouts in core-site.xml seems to have no effect
> on the
> > application.
> >
> > I deploy in kubernetes with a custom docker image. For now,
> I have
> > not enabled the zookeeper-based HA.
> >
> > See below for a frequent stacktrace that I interpret as
> likely to be
> > caused by s3 throttling.
> >
> > Thanks in advance for any help.
> >
> > Regards,
> >
> >
> > Jeff Henrikson
> >
> >
> >
> > 2020-04-30 19:35:24
> > org.apache.flink.runtime.JobException: Recovery is
> suppressed by
> > NoRestartBackoffTimeStrategy
> > at
> >
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>
> >
> > at
> >
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>
> >
> > at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>
> >
> > at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>
> >
> > at
> >
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>
> >
> > at
> >
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>
> >
> > at
> >
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>
> >
> > at
> > jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown
> Source)
> > at
> >
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> >
> > at
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>
> >
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>
> >
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>
> >
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>
> >
> > at akka.japi.pf
> <http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26)
> > at akka.japi.pf
> <http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21)
> > at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> > at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> > at
> > akka.japi.pf
> <http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > at
> >
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > at
> >
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at
> >
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at akka.actor.Actor.aroundReceive(Actor.scala:517)
> > at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> > at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > at
> > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> >
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> >
> > Caused by:
> >
> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> Caught
> > exception when processing split: [0]
> > s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
> 1586911084000
> > : 0 + 33554432
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
>
> >
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
>
> >
> > at
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>
> >
> > Caused by: java.io
> <http://java.io>.InterruptedIOException: Failed to open
> > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
> > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
> > com.amazonaws.SdkClientException: Unable to execute HTTP
> request:
> > Timeout waiting for connection from pool
> > at
> >
> org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
>
> >
> > at
> >
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
> > at
> org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
> > at
> >
> org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:181)
> > at
> >
> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:327)
>
> >
> > at
> > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
> > at
> org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> > at
> > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
> > at
> >
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
> > at
> org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
> > at
> org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
> > at
> org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
> > at
> >
> org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:320)
> > at
> >
> org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:428)
> > at
> > java.base/java.io
> <http://java.io>.DataInputStream.read(DataInputStream.java:149)
> > at
> >
> org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
>
> >
> > at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:695)
>
> >
> > at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:483)
>
> >
> > at
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315)
>
> >
> > Caused by: com.amazonaws.SdkClientException: Unable to
> execute HTTP
> > request: Timeout waiting for connection from pool
> > at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
>
> >
> > at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
>
> >
> > at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>
> >
> > at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>
> >
> > at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>
> >
> > at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>
> >
> > at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>
> >
> > at
> >
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> > at
> >
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> > at
> >
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> > at
> >
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
>
> >
> > at
> >
> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:182)
>
> >
> > at
> org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> > ... 16 more
> > Caused by:
> org.apache.http.conn.ConnectionPoolTimeoutException:
> > Timeout waiting for connection from pool
> > at
> >
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
>
> >
> > at
> >
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
>
> >
> > at
> > jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown
> Source)
> > at
> >
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> >
> > at
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> > at
> >
> com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>
> >
> > at com.amazonaws.http.conn.$Proxy21.get(Unknown Source)
> > at
> >
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
>
> >
> > at
> >
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
> > at
> >
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>
> >
> > at
> >
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>
> >
> > at
> >
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>
> >
> > at
> >
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>
> >
> > at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
>
> >
> > at
> >
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
>
> >
> > ... 27 more
> >
>