You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vinay patil <vi...@gmail.com> on 2017/12/05 17:18:58 UTC

Re: S3 Write Execption

Hi Stephan,

I am facing S3 consistency related issue with the exception pasted at the
end:

We were able to solve the s3 sync issue by adding System.currentTime to
inprogressPrefix, inprogressSuffix, s3PendingPrefix and s3PendingSuffix
properties of BucketingSink.

I tried another approach by updating the BucketingSink code wherein I have
appended the partPath variable with System.currentTime (in openNewPartFile
method).

Can you please let me know if this is the correct approach in order to get
rid of this exception.

TimerException{java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress}
         at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
         at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
         at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
         at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
         at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
         at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to create file due to concurrent
write, file corrupted potentially: s3://<bucket-name>/part-0-0inprogress
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:245)
         at
com.amazon.ws.emr.hadoop.fs.consistency.ConsistencyCheckerS3FileSystem$1.execute(ConsistencyCheckerS3FileSystem.java:201)
         at
com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:188)
         at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
         at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
         at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:147)
         at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.close(SequenceFileWriter.java:116)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
         at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)


Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/