You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flink Developer <de...@protonmail.com> on 2018/12/05 08:44:34 UTC

S3A AWSS3IOException from Flink's BucketingSink to S3

I have a Flink app with high parallelism (400) running in AWS EMR. It uses Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using RocksDb backend for checkpointing). The destination is defined using "s3a://" prefix. The Flink job is a streaming app which runs continuously. At any given time, it's possible that each worker will write to a part file in S3. This means all workers combined could potentially generate/write to 400 files (due to 400 parallelism).

After a few days, one of the workers will fail with the exception:

    org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy

    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)

    at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)

    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)

    at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)

    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)

    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)

    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)

This seems to randomly occur when a new part file is created by the BucketingSink. The odd thing is that this happens randomly but consistently on separate job executions. When it occurs, it happens to 1 of the parallel flink workers (not all). Also, when this occurs, the Flink job transitions into a FAILING state, but the Flink job does not restart and resume/recover from the last successful checkpoint.

What is the cause for this and how can it be resolved? Also, how can the job be configured to restart/recover from the last successful checkpoint instead of staying in the FAILING state?

Re: S3A AWSS3IOException from Flink's BucketingSink to S3

Posted by Ken Krugler <kk...@transpac.com>.
Hi Generic Flink Developer,

Normally when you get an internal error from AWS, you also get a 500 status code - the 200 seems odd to me.

One thing I do know is that if you’re hitting S3 hard, you have to expect and recover from errors.

E.g. distcpy jobs in Hadoop-land will auto-retry a failed request, because Things Go Wrong in AWS-land.

So it surprises me a bit that BucketingSink is using a raw S3AFileSystem. In absence of Hadoop 3.1 support for S3A retry policies <https://issues.apache.org/jira/browse/HADOOP-13786>, it seems like Flink would want to wrap the S3AFileSystem with something that would retry requests which get an internal error.

But I haven’t walked that code, so maybe there is retry support somewhere else…

— Ken



> On Dec 9, 2018, at 1:37 PM, Flink Developer <de...@protonmail.com> wrote:
> 
> Hi, is there any idea on what causes this and how it can be resolved? Thanks.
> 
> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
> On Wednesday, December 5, 2018 12:44 AM, Flink Developer <de...@protonmail.com> wrote:
> 
>> I have a Flink app with high parallelism (400) running in AWS EMR. It uses Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using RocksDb backend for checkpointing). The destination is defined using "s3a://" prefix. The Flink job is a streaming app which runs continuously. At any given time, it's possible that each worker will write to a part file in S3. This means all workers combined could potentially generate/write to 400 files (due to 400 parallelism). 
>> 
>> After a few days, one of the workers will fail with the exception:
>> 
>>     org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
>>     at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)
>>     at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)
>>     at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
>>     at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
>>     at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
>>     at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
>>     at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
>> 
>> This seems to randomly occur when a new part file is created by the BucketingSink. The odd thing is that this happens randomly but consistently on separate job executions. When it occurs, it happens to 1 of the parallel flink workers (not all). Also, when this occurs, the Flink job transitions into a FAILING state, but the Flink job does not restart and resume/recover from the last successful checkpoint.  
>> 
>> What is the cause for this and how can it be resolved? Also, how can the job be configured to restart/recover from the last successful checkpoint instead of staying in the FAILING state?
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Re: S3A AWSS3IOException from Flink's BucketingSink to S3

Posted by Flink Developer <de...@protonmail.com>.
Hi, is there any idea on what causes this and how it can be resolved? Thanks.

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Wednesday, December 5, 2018 12:44 AM, Flink Developer <de...@protonmail.com> wrote:

> I have a Flink app with high parallelism (400) running in AWS EMR. It uses Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using RocksDb backend for checkpointing). The destination is defined using "s3a://" prefix. The Flink job is a streaming app which runs continuously. At any given time, it's possible that each worker will write to a part file in S3. This means all workers combined could potentially generate/write to 400 files (due to 400 parallelism).
>
> After a few days, one of the workers will fail with the exception:
>
>     org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
>     at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)
>     at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)
>     at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
>     at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
>     at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
>     at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
>     at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
>
> This seems to randomly occur when a new part file is created by the BucketingSink. The odd thing is that this happens randomly but consistently on separate job executions. When it occurs, it happens to 1 of the parallel flink workers (not all). Also, when this occurs, the Flink job transitions into a FAILING state, but the Flink job does not restart and resume/recover from the last successful checkpoint.
>
> What is the cause for this and how can it be resolved? Also, how can the job be configured to restart/recover from the last successful checkpoint instead of staying in the FAILING state?