You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aneesha Kaushal <an...@reflektion.com> on 2017/08/01 18:38:54 UTC

S3 Write Execption

Hello, 

I am using flink 1.2 and writing records to S3 using rolling sink.  

I am encountering this S3 write error quite frequently :

TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
	... 7 more

I am unable to find the cause of this error. Also, I have the following questions regarding this error : 

1) Do we loose any data or flink will go to last checkpoint and write again?
2) how can we prevent this error?

Thanks,
Aneesha



Re: S3 Write Execption

Posted by vinay patil <vi...@gmail.com>.
Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress}
         at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
         at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
         at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
         at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
         at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
         at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
         at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
         at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
         at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
         at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: S3 Write Execption

Posted by vinay patil <vi...@gmail.com>.
Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress}
         at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
         at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
         at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
         at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
         at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
         at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
         at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
         at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
         at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
         at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: S3 Write Execption

Posted by Stephan Ewen <se...@apache.org>.
It is very important to point out that the Bucketing sink can currently NOT
work properly on S3. It assumes a consistent file system (like listing /
renaming works consistently), and S3 is only eventually consistent. I
assume that this eventual consistency of S3 is the cause of your error.

There is a pull request for a bucketing sink on eventually consistent FS:
https://github.com/apache/flink/pull/3752
Hope we can merge this once we are done with the 1.3.2 release.

(cc-ing Gordon and Aljoscha, FYI)

On Wed, Aug 2, 2017 at 10:56 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Aneesha,
>
> the logs would show that Flink is going through a recovery cycle. Recovery
> means to cancel running tasks and start them again.
> If you don't see something like that in the logs, Flink continues to
> processing.
>
> I'm not familiar with the details of S3, so I can't tell if the exception
> indicates data loss.
>
> Best, Fabian
>
> 2017-08-01 20:38 GMT+02:00 Aneesha Kaushal <aneesha.kaushal@reflektion.com
> >:
>
>> Hello,
>>
>> I am using flink 1.2 and writing records to S3 using rolling sink.
>>
>> I am encountering this S3 write error quite frequently :
>>
>> TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
>> 	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> 	at java.lang.Thread.run(Thread.java:748)
>> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
>> 	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>> 	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>> 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>> 	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>> 	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
>> 	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
>> 	at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
>> 	at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
>> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
>> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
>> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
>> 	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
>> 	... 7 more
>>
>>
>> I am unable to find the cause of this error. Also, I have the following
>> questions regarding this error :
>>
>> 1) Do we loose any data or flink will go to last checkpoint and write
>> again?
>> 2) how can we prevent this error?
>>
>> Thanks,
>> Aneesha
>>
>>
>>
>

Re: S3 Write Execption

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Aneesha,

the logs would show that Flink is going through a recovery cycle. Recovery
means to cancel running tasks and start them again.
If you don't see something like that in the logs, Flink continues to
processing.

I'm not familiar with the details of S3, so I can't tell if the exception
indicates data loss.

Best, Fabian

2017-08-01 20:38 GMT+02:00 Aneesha Kaushal <an...@reflektion.com>:

> Hello,
>
> I am using flink 1.2 and writing records to S3 using rolling sink.
>
> I am encountering this S3 write error quite frequently :
>
> TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
> 	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
> 	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
> 	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
> 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
> 	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
> 	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
> 	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
> 	at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
> 	at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
> 	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
> 	... 7 more
>
>
> I am unable to find the cause of this error. Also, I have the following
> questions regarding this error :
>
> 1) Do we loose any data or flink will go to last checkpoint and write
> again?
> 2) how can we prevent this error?
>
> Thanks,
> Aneesha
>
>
>