You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Pawel Bartoszek (JIRA)" <ji...@apache.org> on 2018/11/01 16:00:00 UTC

[jira] [Updated] (BEAM-5934) FileSink affected by S3 eventual consistency

     [ https://issues.apache.org/jira/browse/BEAM-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Pawel Bartoszek updated BEAM-5934:
----------------------------------
    Description: 
After upgrading to BEAM 2.7.0 and Flink 1.5.2 every day few times a day job throws _No such file or directory_ exception when trying to move temp bundle to the final location.

After digging into the code it looks like it's kind of S3 eventual consistency problem. Where the HEAD request, used to check if the temporary file exists before copying it to final location, returns 404 and the whole copy operation fails.

We use sharded writes(32). Job output arounds 256 files a minute. But the exception is thrown max 3 times a day - which suggest that there is some race condition somewhere.

 

The case where S3 enforces eventual consistency is where the check if the file exist is being made before uploading the file. I checked the BEAM FileSink and couldn't find any logic that pre-check if the temp bundle file exists. 

 
{code:java}
Amazon S3 provides read-after-write consistency for PUTS of new objects in your S3 bucket in all regions with one caveat. The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides eventual consistency for read-after-write.{code}
 

The logs from the job
{code:java}
2018-10-29 17:45:03,873 INFO org.apache.beam.sdk.io.WriteFiles - Opening writer f990d5a0-d5a8-4ce2-adee-baa01e294ae4 for window [2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z) pane PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null

2018-10-29 17:45:04,043 INFO org.apache.beam.sdk.io.FileBasedSink$Writer - Successfully wrote temporary file s3a:/XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4

2018-10-29 17:45:05,437 INFO org.apache.beam.sdk.io.FileBasedSink - Will copy temporary file FileResult{tempFilename=s3a://XXX/beam/.temp-beam-2018-

10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4, shard=9, window=[2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z), paneInfo=PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location s3a://XXX/rdot-17:43-17:44-pane-0-on_time-first-9.gz{code}
 
{code:java}
Caused by: org.apache.beam.sdk.util.UserCodeException: java.io.FileNotFoundException: No such file or directory: s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:128)
at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:628)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:80)
at org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:204)
at org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:71)
at org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
at org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
at org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
at org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:367)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
at org.apache.beam.sdk.io.hdfs.HadoopFileSystem.copy(HadoopFileSystem.java:132)
at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:288)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:761)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:797){code}
 

CloudTrail logs (please note the the presented order of api calls might not reflected the reality as eventtime is second precision only)
||eventtime||operation||errormessage||requestparameters||
|2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
|2018-10-29T17:45:03Z|ListObjects| |{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
|2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}| |
|2018-10-29T17:45:04Z|PutObject| |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
|2018-10-29T17:45:05Z|HeadObject| |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
|2018-10-29T17:45:06Z|ListObjects| |{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
|2018-10-29T17:45:06Z|HeadObject| |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"|
|2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}| |
|2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}| |

  was:
After upgrading to BEAM 2.7.0 and Flink 1.5.2 every day few times a day job throws _No such file or directory_ exception when trying to move temp bundle to the final location.

After digging into the code it looks like it's kind of S3 eventual consistency problem. Where the HEAD request, used to check if the temporary file exists before copying it to final location, returns 404 and the whole copy operation fails.

We use sharded writes(32). Job output arounds 256 files a minute. But the exception is thrown max 3 times a day - which suggest that there is some race condition somewhere.

 

The case where S3 enforces eventual consistency is where the check if the file exist is being made before uploading the file. I checked the BEAM FileSink and couldn't find any logic that pre-check if the temp bundle file exists. 

 
{code:java}
Amazon S3 provides read-after-write consistency for PUTS of new objects in your S3 bucket in all regions with one caveat. The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides eventual consistency for read-after-write.{code}
 

The logs from the job
{code:java}
2018-10-29 17:45:03,873 INFO org.apache.beam.sdk.io.WriteFiles - Opening writer f990d5a0-d5a8-4ce2-adee-baa01e294ae4 for window [2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z) pane PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null

2018-10-29 17:45:04,043 INFO org.apache.beam.sdk.io.FileBasedSink$Writer - Successfully wrote temporary file s3a:/XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4

2018-10-29 17:45:05,437 INFO org.apache.beam.sdk.io.FileBasedSink - Will copy temporary file FileResult{tempFilename=s3a://XXX/beam/.temp-beam-2018-

10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4, shard=9, window=[2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z), paneInfo=PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location s3a://XXX/rdot-17:43-17:44-pane-0-on_time-first-9.gz{code}
 
{code:java}
Caused by: org.apache.beam.sdk.util.UserCodeException: java.io.FileNotFoundException: No such file or directory: s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:128)
at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:628)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:80)
at org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:204)
at org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:71)
at org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
at org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
at org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
at org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:367)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
at org.apache.beam.sdk.io.hdfs.HadoopFileSystem.copy(HadoopFileSystem.java:132)
at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:288)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:761)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:797){code}
 

CloudTrail logs (please note the the presented order of api calls might not reflected the reality as eventtime is second precision only)
||eventtime||errorcode||errormessage||requestparameters||responseelements||
|2018-10-29T17:45:03Z|HeadObject|NoSuchKey|The specified key does not exist.|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
|2018-10-29T17:45:03Z|ListObjects|{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
|2018-10-29T17:45:03Z|HeadObject|NoSuchKey|The specified key does not exist.|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}|
|2018-10-29T17:45:04Z|PutObject|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
|2018-10-29T17:45:05Z|HeadObject|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
|2018-10-29T17:45:06Z|ListObjects|{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
|2018-10-29T17:45:06Z|HeadObject|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"|
|2018-10-29T17:45:06Z|HeadObject|NoSuchKey|The specified key does not exist.|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
|2018-10-29T17:45:06Z|HeadObject|NoSuchKey|The specified key does not exist.|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}|


> FileSink affected by S3 eventual consistency
> --------------------------------------------
>
>                 Key: BEAM-5934
>                 URL: https://issues.apache.org/jira/browse/BEAM-5934
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-hadoop
>    Affects Versions: 2.7.0
>            Reporter: Pawel Bartoszek
>            Assignee: Chamikara Jayalath
>            Priority: Major
>
> After upgrading to BEAM 2.7.0 and Flink 1.5.2 every day few times a day job throws _No such file or directory_ exception when trying to move temp bundle to the final location.
> After digging into the code it looks like it's kind of S3 eventual consistency problem. Where the HEAD request, used to check if the temporary file exists before copying it to final location, returns 404 and the whole copy operation fails.
> We use sharded writes(32). Job output arounds 256 files a minute. But the exception is thrown max 3 times a day - which suggest that there is some race condition somewhere.
>  
> The case where S3 enforces eventual consistency is where the check if the file exist is being made before uploading the file. I checked the BEAM FileSink and couldn't find any logic that pre-check if the temp bundle file exists. 
>  
> {code:java}
> Amazon S3 provides read-after-write consistency for PUTS of new objects in your S3 bucket in all regions with one caveat. The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides eventual consistency for read-after-write.{code}
>  
> The logs from the job
> {code:java}
> 2018-10-29 17:45:03,873 INFO org.apache.beam.sdk.io.WriteFiles - Opening writer f990d5a0-d5a8-4ce2-adee-baa01e294ae4 for window [2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z) pane PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
> 2018-10-29 17:45:04,043 INFO org.apache.beam.sdk.io.FileBasedSink$Writer - Successfully wrote temporary file s3a:/XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
> 2018-10-29 17:45:05,437 INFO org.apache.beam.sdk.io.FileBasedSink - Will copy temporary file FileResult{tempFilename=s3a://XXX/beam/.temp-beam-2018-
> 10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4, shard=9, window=[2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z), paneInfo=PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location s3a://XXX/rdot-17:43-17:44-pane-0-on_time-first-9.gz{code}
>  
> {code:java}
> Caused by: org.apache.beam.sdk.util.UserCodeException: java.io.FileNotFoundException: No such file or directory: s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
> at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown Source)
> at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
> at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
> at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
> at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
> at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:128)
> at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
> at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
> at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
> at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:628)
> at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:80)
> at org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:204)
> at org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown Source)
> at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
> at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
> at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
> at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
> at org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:71)
> at org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown Source)
> at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
> at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
> at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
> at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
> at org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
> at org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown Source)
> at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
> at org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown Source)
> at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
> at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
> at org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
> at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
> at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
> at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
> at org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
> at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
> at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
> at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
> at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: No such file or directory: s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
> at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:367)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
> at org.apache.beam.sdk.io.hdfs.HadoopFileSystem.copy(HadoopFileSystem.java:132)
> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:288)
> at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:761)
> at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:797){code}
>  
> CloudTrail logs (please note the the presented order of api calls might not reflected the reality as eventtime is second precision only)
> ||eventtime||operation||errormessage||requestparameters||
> |2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
> |2018-10-29T17:45:03Z|ListObjects| |{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
> |2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}| |
> |2018-10-29T17:45:04Z|PutObject| |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
> |2018-10-29T17:45:05Z|HeadObject| |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
> |2018-10-29T17:45:06Z|ListObjects| |{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
> |2018-10-29T17:45:06Z|HeadObject| |{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"|
> |2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}| |
> |2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}| |



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)