You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2019/01/16 12:36:00 UTC

[jira] [Closed] (FLINK-11187) StreamingFileSink with S3 backend transient socket timeout issues

     [ https://issues.apache.org/jira/browse/FLINK-11187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aljoscha Krettek closed FLINK-11187.
------------------------------------
    Resolution: Fixed

> StreamingFileSink with S3 backend transient socket timeout issues 
> ------------------------------------------------------------------
>
>                 Key: FLINK-11187
>                 URL: https://issues.apache.org/jira/browse/FLINK-11187
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystem, Streaming Connectors
>    Affects Versions: 1.7.0, 1.7.1
>            Reporter: Addison Higham
>            Assignee: Addison Higham
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.2, 1.8.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> When using the StreamingFileSink with S3A backend, occasionally, errors like this will occur:
> {noformat}
> Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended Request ID: xxx, S3 Extended Request ID: xxx
>        at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>        at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>        at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat}
> This causes a restart of flink job, which is often able to recover from, but under heavy load, this can become very frequent.
>  
> Turning on debug logs you can find the following relevant stack trace:
> {noformat}
> 2018-12-17 05:55:46,546 DEBUG org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>   at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>   at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>   at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>   at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748){noformat}
> This error occurs because of a transient failure in writing a multipart chunk fails and the underlying InputStream cannot be reset. This ResetException should be thrown to the client (as documented here: [https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html]) but for some reason is not, instead, the client is retrying the request, but now with a fully consumed InputStream. Because this InputStream is empty/smaller we can't fill up the Content-Length that the multipart upload is expecting, so the socket hangs to eventually be timed out.
>  
> This failure happens roughly ~20 times before the AWS client retry logic finally fails the request and the socket time out exception is thrown.
>  
> As mentioned in the best practice AWS doc, the best fix for this is to use a File or FileInputStream object or to use the setReadLimit. I tried to use a global SDK property (com.amazonaws.sdk.s3.defaultStreamBufferSize) to set this value, but that did not fix the problem, which I believe is because the InputStream is not mark-able and the AWS client doesn't wrap the stream.
>  
> What is confirmed to work is the following patch: [https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6]
>  
> That is obviously not ideal, but it may suffice to just make that configurable.
>  
> The other option is to instead expose the S3A WriteHelper option to pass a file to the S3AccessHelper and change the other relevant classes (RefCountedFSOutputStream) to expose the File object and directly hand that to the S3A WriteHelper



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)